This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch fix_401_correct_chunk_size in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 6d65b743d62a2600159b5cd1a246faa422565424 Author: xiangdong huang <[email protected]> AuthorDate: Mon Jan 6 21:47:06 2020 +0800 IOTDB-401, skip the size of a chunk header if the chunk has no data points --- .../apache/iotdb/db/tools/TsFileSketchTool.java | 11 +- .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 3 + .../iotdb/tsfile/write/chunk/IChunkWriter.java | 2 + .../iotdb/tsfile/write/TsFileWriterTest.java | 255 +++++++++++++++++++++ 4 files changed, 269 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java index fbeba9d..2169498 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java @@ -130,8 +130,15 @@ public class TsFileSketchTool { } // metadata begins - printlnBoth(pw, String.format("%20s", tsDeviceMetadataIndexSortedList.get(0).getOffset() - 1) - + "|\t[marker] 2"); + if (tsDeviceMetadataIndexSortedList.size() != 0) { + printlnBoth(pw, + String.format("%20s", tsDeviceMetadataIndexSortedList.get(0).getOffset() - 1) + + "|\t[marker] 2"); + } else { + printlnBoth(pw, + String.format("%20s", reader.getFileMetadataPos() - 1) + + "|\t[marker] 2"); + } for ( int i = 0; i < tsDeviceMetadataSortedList.size(); i++) { TsDeviceMetadata tsDeviceMetadata = tsDeviceMetadataSortedList.get(i); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java index 486cf41..8d11612 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java @@ -231,6 +231,9 @@ public class ChunkWriterImpl implements IChunkWriter { @Override public long getCurrentChunkSize() { + if (this.getCurrentDataSize() == 0) { + return 0; + } // return the serialized size of the chunk header + all pages return ChunkHeader.getSerializedSize(measurementSchema.getMeasurementId()) + this .getCurrentDataSize(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java index c375b45..9259387 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/IChunkWriter.java @@ -103,6 +103,8 @@ public interface IChunkWriter { * return the serialized size of the chunk header + all pages (not including the un-sealed page). * Notice, call this method before calling writeToFileWriter(), otherwise the page buffer in * memory will be cleared. + * <br> If there is no data points in the chunk, return 0 (i.e., in this case, the size of header + * is not calculated, because nothing will be serialized latter)</> */ long getCurrentChunkSize(); diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileWriterTest.java new file mode 100644 index 0000000..d23495c --- /dev/null +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileWriterTest.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.write; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import org.apache.iotdb.tsfile.exception.encoding.TsFileEncodingException; +import org.apache.iotdb.tsfile.exception.write.NoMeasurementException; +import org.apache.iotdb.tsfile.exception.write.WriteProcessException; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.ReadOnlyTsFile; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.expression.QueryExpression; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; +import org.apache.iotdb.tsfile.write.record.RowBatch; +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint; +import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TsFileWriterTest { + TsFileWriter writer = null; + long fileName = System.nanoTime(); + boolean closed = false; + @Before + public void setUp() { + try { + writer = new TsFileWriter(new File("target/tsfileWriter-" + fileName)); + addMeasurement(); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @After + public void tearDown() { + if (!closed) { + closeFile(); + } + try { + Files.deleteIfExists(Path.of("target/tsfileWriter-" + fileName)); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private void addMeasurement() { + try { + //String measurementId, TSDataType type, TSEncoding encoding, + // CompressionType compressionType + writer.addMeasurement( + new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY)); + } catch (WriteProcessException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + try { + //String measurementId, TSDataType type, TSEncoding encoding, + // CompressionType compressionType + writer.addMeasurement( + new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY)); + } catch (WriteProcessException e) { + Assert.assertEquals("given measurement has exists! s1", e.getMessage()); + } + try { + //String measurementId, TSDataType type, TSEncoding encoding, + // CompressionType compressionType + writer.addMeasurement( + new MeasurementSchema("s2", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY)); + } catch (WriteProcessException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + } + + @Test + public void addMeasurementTest() { + closeFile(); + readNothing(); + } + + + private void writeTSRecord() throws IOException, WriteProcessException { + //normal + TSRecord record = new TSRecord( 10000, "d1"); + record.addTuple(new FloatDataPoint("s1", 5.0f)); + record.addTuple(new IntDataPoint("s2", 5)); + writer.write(record); + + //not existed time series + record = new TSRecord( 10001, "d1"); + record.addTuple(new FloatDataPoint("s3", 5.0f)); + try { + writer.write(record); + } catch (WriteProcessException e) { + assertTrue(e instanceof NoMeasurementException); + } + } + + @Test + public void writeTSRecordTest() throws IOException, WriteProcessException { + writeTSRecord(); + closeFile(); + readOneRow(); + } + + @Test + public void writeIncorrectTSRecord0() throws IOException, WriteProcessException { + //incorrect data type + TSRecord record = new TSRecord(10002, "d2"); + record.addTuple(new IntDataPoint("s1", 5)); + try { + writer.write(record); + } catch (TsFileEncodingException e) { + //do nothing + } + closeFile(); + } + + @Test + public void writeIncorrectTSRecord() throws IOException, WriteProcessException { + writeTSRecord(); + //incorrect data type + TSRecord record = new TSRecord(10002, "d2"); + record.addTuple(new IntDataPoint("s1", 5)); + try { + writer.write(record); + } catch (TsFileEncodingException e) { + //do nothing + } + closeFile(); + readOneRow(); + } + + @Test + public void writeRowBatch() throws IOException, WriteProcessException { + RowBatch rowBatch = new RowBatch("d1", Arrays.asList(new MeasurementSchema[]{ + new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY), + new MeasurementSchema("s2", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY) + })); + rowBatch.timestamps[0] = 10000; + ((float[])rowBatch.values[0])[0] = 5.0f; + ((int[])rowBatch.values[1])[0] = 5; + rowBatch.batchSize = 1; + writer.write(rowBatch); + closeFile(); + readOneRow(); + } + + @Test + public void writeRowBatch2() throws IOException, WriteProcessException { + RowBatch rowBatch = new RowBatch("d1", Arrays.asList(new MeasurementSchema[]{ + new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY), + new MeasurementSchema("s2", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY) + })); + rowBatch.timestamps[0] = 10000; + ((float[])rowBatch.values[0])[0] = 5.0f; + rowBatch.batchSize = 1; + writer.write(rowBatch); + closeFile(); + //in this case, the value of s2 = 0 at time 10000. + } + + @Test + public void getIOWriter() throws IOException { + //The interface is just for test + writer.getIOWriter(); + } + + @Test + public void flushForTest() throws IOException { + //The interface is just for test + writer.flushForTest(); + } + + private void closeFile() { + try { + closed = true; + writer.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + private void readNothing() { + //using TsFileReader for test + try { + ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile( + new TsFileSequenceReader("target/tsfileWriter-" + fileName)); + QueryDataSet dataSet = readOnlyTsFile.query(QueryExpression.create() + .addSelectedPath(new org.apache.iotdb.tsfile.read.common.Path("d1.s1")) + .addSelectedPath(new org.apache.iotdb.tsfile.read.common.Path("d1.s2"))); + assertFalse(dataSet.hasNext()); + readOnlyTsFile.close(); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + private void readOneRow() { + try { + ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile( + new TsFileSequenceReader("target/tsfileWriter-" + fileName)); + QueryDataSet dataSet = readOnlyTsFile.query(QueryExpression.create() + .addSelectedPath(new org.apache.iotdb.tsfile.read.common.Path("d1.s1")) + .addSelectedPath(new org.apache.iotdb.tsfile.read.common.Path("d1.s2")) + .addSelectedPath(new org.apache.iotdb.tsfile.read.common.Path("d1.s3"))); + while(dataSet.hasNext()) { + RowRecord result = dataSet.next(); + assertEquals(2, result.getFields().size()); + assertEquals(10000, result.getTimestamp()); + assertEquals(5.0f, result.getFields().get(0).getFloatV(), 0.00001); + assertEquals(5, result.getFields().get(1).getIntV()); + } + readOnlyTsFile.close(); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +}
