This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch support_set_compression_by_type_1.3 in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 85648499afa9eaf00af953ab6262b575e4091af6 Author: Jiang Tian <[email protected]> AuthorDate: Tue Jun 24 19:05:56 2025 +0800 Support set default compression by data type (#523) (cherry picked from commit de166a7d8bd1a5b3a6bdf4749362d583dfb1a9ef) --- .../apache/tsfile/common/conf/TSFileConfig.java | 120 ++++++ .../tsfile/common/conf/TSFileDescriptor.java | 6 + .../tsfile/write/chunk/AlignedChunkWriterImpl.java | 7 +- .../tsfile/write/schema/MeasurementSchema.java | 6 +- .../tsfile/write/schema/TimeseriesSchema.java | 6 +- .../write/schema/VectorMeasurementSchema.java | 107 +++++- .../apache/tsfile/write/writer/TsFileIOWriter.java | 4 + .../org/apache/tsfile/write/ChunkRewriteTest.java | 425 +++++++++++++++++++++ .../apache/tsfile/write/TsFileIOWriterTest.java | 44 ++- 9 files changed, 684 insertions(+), 41 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java index 72cf59be..a7ef6b6e 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java @@ -102,6 +102,42 @@ public class TSFileConfig implements Serializable { */ private String timeEncoding = "TS_2DIFF"; + /** Encoder of boolean column. Default value is RLE. */ + private String booleanEncoding = "RLE"; + + /** Encoder of int32 and date column. Default value is TS_2DIFF. */ + private String int32Encoding = "TS_2DIFF"; + + /** Encoder of int64 and timestamp column. Default value is TS_2DIFF. */ + private String int64Encoding = "TS_2DIFF"; + + /** Encoder of float column. Default value is GORILLA. */ + private String floatEncoding = "GORILLA"; + + /** Encoder of double column. Default value is GORILLA. */ + private String doubleEncoding = "GORILLA"; + + /** Encoder of string, blob and text column. Default value is PLAIN. */ + private String textEncoding = "PLAIN"; + + /** Compression of boolean column. Defaults to the overall compression. */ + private String booleanCompression = null; + + /** Compression of int32 and date column. Defaults to the overall compression. */ + private String int32Compression = null; + + /** Compression of int64 and timestamp column. Defaults to the overall compression. */ + private String int64Compression = null; + + /** Compression of float column. Defaults to the overall compression. */ + private String floatCompression = null; + + /** Compression of double column. Defaults to the overall compression. */ + private String doubleCompression = null; + + /** Compression of string, blob and text column. Defaults to the overall compression. */ + private String textCompression = null; + /** * Encoder of value series. default value is PLAIN. For int, long data type, TsFile also supports * TS_2DIFF, REGULAR, GORILLA and RLE(run-length encoding). For float, double data type, TsFile @@ -288,6 +324,66 @@ public class TSFileConfig implements Serializable { return valueEncoder; } + public String getValueEncoder(TSDataType dataType) { + switch (dataType) { + case BOOLEAN: + return booleanEncoding; + case INT32: + case DATE: + return int32Encoding; + case INT64: + case TIMESTAMP: + return int64Encoding; + case FLOAT: + return floatEncoding; + case DOUBLE: + return doubleEncoding; + case STRING: + case BLOB: + case TEXT: + default: + return textEncoding; + } + } + + public CompressionType getCompressor(TSDataType dataType) { + String compressionName; + switch (dataType) { + case BOOLEAN: + compressionName = booleanCompression; + break; + case INT32: + case DATE: + compressionName = int32Compression; + break; + case INT64: + case TIMESTAMP: + compressionName = int64Compression; + break; + case FLOAT: + compressionName = floatCompression; + break; + case DOUBLE: + compressionName = doubleCompression; + break; + case STRING: + case BLOB: + case TEXT: + compressionName = textCompression; + break; + default: + compressionName = null; + } + + CompressionType compressionType; + if (compressionName != null) { + compressionType = CompressionType.valueOf(compressionName); + } else { + compressionType = compressor; + } + return compressionType; + } + public void setValueEncoder(String valueEncoder) { this.valueEncoder = valueEncoder; } @@ -568,4 +664,28 @@ public class TSFileConfig implements Serializable { public void setLz4UseJni(boolean lz4UseJni) { this.lz4UseJni = lz4UseJni; } + + public void setBooleanCompression(String booleanCompression) { + this.booleanCompression = booleanCompression; + } + + public void setInt32Compression(String int32Compression) { + this.int32Compression = int32Compression; + } + + public void setInt64Compression(String int64Compression) { + this.int64Compression = int64Compression; + } + + public void setFloatCompression(String floatCompression) { + this.floatCompression = floatCompression; + } + + public void setDoubleCompression(String doubleCompression) { + this.doubleCompression = doubleCompression; + } + + public void setTextCompression(String textCompression) { + this.textCompression = textCompression; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java index 8f2e2c78..d00220fd 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java @@ -81,6 +81,12 @@ public class TSFileDescriptor { writer.setInt(conf::setFloatPrecision, "float_precision"); writer.setString(conf::setValueEncoder, "value_encoder"); writer.setString(conf::setCompressor, "compressor"); + writer.setString(conf::setBooleanCompression, "boolean_compressor"); + writer.setString(conf::setInt32Compression, "int32_compressor"); + writer.setString(conf::setInt64Compression, "int64_compressor"); + writer.setString(conf::setFloatCompression, "float_compressor"); + writer.setString(conf::setDoubleCompression, "double_compressor"); + writer.setString(conf::setTextCompression, "text_compressor"); writer.setInt(conf::setBatchSize, "batch_size"); writer.setBoolean(conf::setLz4UseJni, "lz4_use_jni"); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java index 22d310c2..515f8e6e 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java @@ -56,7 +56,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter { timeChunkWriter = new TimeChunkWriter( schema.getMeasurementId(), - schema.getCompressor(), + schema.getTimeCompressor(), schema.getTimeTSEncoding(), schema.getTimeEncoder()); @@ -70,7 +70,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter { valueChunkWriterList.add( new ValueChunkWriter( valueMeasurementIdList.get(i), - schema.getCompressor(), + schema.getValueCompressor(i), valueTSDataTypeList.get(i), valueTSEncodingList.get(i), valueEncoderList.get(i))); @@ -122,7 +122,8 @@ public class AlignedChunkWriterImpl implements IChunkWriter { TSEncoding timeEncoding = TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()); TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType(); - CompressionType timeCompression = TSFileDescriptor.getInstance().getConfig().getCompressor(); + CompressionType timeCompression = + TSFileDescriptor.getInstance().getConfig().getCompressor(TSDataType.INT64); timeChunkWriter = new TimeChunkWriter( "", diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java index 545d9cc2..b3f20ac4 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java @@ -63,8 +63,8 @@ public class MeasurementSchema this( measurementId, tsDataType, - TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()), - TSFileDescriptor.getInstance().getConfig().getCompressor(), + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder(tsDataType)), + TSFileDescriptor.getInstance().getConfig().getCompressor(tsDataType), null); } @@ -74,7 +74,7 @@ public class MeasurementSchema measurementId, type, encoding, - TSFileDescriptor.getInstance().getConfig().getCompressor(), + TSFileDescriptor.getInstance().getConfig().getCompressor(type), null); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java index 0c287e85..21c74265 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java @@ -55,8 +55,8 @@ public class TimeseriesSchema implements Comparable<TimeseriesSchema>, Serializa this( fullPath, tsDataType, - TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()), - TSFileDescriptor.getInstance().getConfig().getCompressor(), + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder(tsDataType)), + TSFileDescriptor.getInstance().getConfig().getCompressor(tsDataType), Collections.emptyMap()); } @@ -66,7 +66,7 @@ public class TimeseriesSchema implements Comparable<TimeseriesSchema>, Serializa fullPath, type, encoding, - TSFileDescriptor.getInstance().getConfig().getCompressor(), + TSFileDescriptor.getInstance().getConfig().getCompressor(type), Collections.emptyMap()); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java index 9b157540..33c21479 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java @@ -47,13 +47,19 @@ public class VectorMeasurementSchema RamUsageEstimator.shallowSizeOfInstance(VectorMeasurementSchema.class); private static final long BUILDER_SIZE = RamUsageEstimator.shallowSizeOfInstance(TSEncodingBuilder.class); + private static final byte NO_UNIFIED_COMPRESSOR = -1; private String deviceId; private Map<String, Integer> measurementsToIndexMap; private byte[] types; private byte[] encodings; private TSEncodingBuilder[] encodingConverters; - private byte compressor; + + /** For compatibility of old versions. */ + private byte unifiedCompressor; + + /** [0] is for the time column. */ + private byte[] compressors; public VectorMeasurementSchema() {} @@ -80,7 +86,34 @@ public class VectorMeasurementSchema } this.encodings = encodingsInByte; this.encodingConverters = new TSEncodingBuilder[subMeasurements.length]; - this.compressor = compressionType.serialize(); + this.unifiedCompressor = compressionType.serialize(); + } + + public VectorMeasurementSchema( + String deviceId, + String[] subMeasurements, + TSDataType[] types, + TSEncoding[] encodings, + byte[] compressors) { + this.deviceId = deviceId; + this.measurementsToIndexMap = new HashMap<>(); + for (int i = 0; i < subMeasurements.length; i++) { + measurementsToIndexMap.put(subMeasurements[i], i); + } + byte[] typesInByte = new byte[types.length]; + for (int i = 0; i < types.length; i++) { + typesInByte[i] = types[i].serialize(); + } + this.types = typesInByte; + + byte[] encodingsInByte = new byte[encodings.length]; + for (int i = 0; i < encodings.length; i++) { + encodingsInByte[i] = encodings[i].serialize(); + } + this.encodings = encodingsInByte; + this.encodingConverters = new TSEncodingBuilder[subMeasurements.length]; + this.unifiedCompressor = NO_UNIFIED_COMPRESSOR; + this.compressors = compressors; } public VectorMeasurementSchema(String deviceId, String[] subMeasurements, TSDataType[] types) { @@ -101,7 +134,15 @@ public class VectorMeasurementSchema .serialize(); } this.encodingConverters = new TSEncodingBuilder[subMeasurements.length]; - this.compressor = TSFileDescriptor.getInstance().getConfig().getCompressor().serialize(); + this.unifiedCompressor = NO_UNIFIED_COMPRESSOR; + // the first column is time + this.compressors = new byte[subMeasurements.length + 1]; + compressors[0] = + TSFileDescriptor.getInstance().getConfig().getCompressor(TSDataType.INT64).serialize(); + for (int i = 0; i < types.length; i++) { + compressors[i + 1] = + TSFileDescriptor.getInstance().getConfig().getCompressor(types[i]).serialize(); + } } public VectorMeasurementSchema( @@ -124,9 +165,24 @@ public class VectorMeasurementSchema return deviceId; } + @Deprecated // Aligned series should not invoke this method @Override public CompressionType getCompressor() { - return CompressionType.deserialize(compressor); + throw new UnsupportedOperationException("Aligned series should not invoke this method"); + } + + public CompressionType getTimeCompressor() { + if (compressors != null) { + return CompressionType.deserialize(compressors[0]); + } + return CompressionType.deserialize(unifiedCompressor); + } + + public CompressionType getValueCompressor(int index) { + if (compressors != null) { + return CompressionType.deserialize(compressors[index + 1]); + } + return CompressionType.deserialize(unifiedCompressor); } @Override @@ -276,7 +332,11 @@ public class VectorMeasurementSchema for (byte encoding : encodings) { byteLen += ReadWriteIOUtils.write(encoding, buffer); } - byteLen += ReadWriteIOUtils.write(compressor, buffer); + byteLen += ReadWriteIOUtils.write(unifiedCompressor, buffer); + if (unifiedCompressor == NO_UNIFIED_COMPRESSOR) { + buffer.put(compressors); + byteLen += compressors.length; + } return byteLen; } @@ -297,7 +357,11 @@ public class VectorMeasurementSchema for (byte encoding : encodings) { byteLen += ReadWriteIOUtils.write(encoding, outputStream); } - byteLen += ReadWriteIOUtils.write(compressor, outputStream); + byteLen += ReadWriteIOUtils.write(unifiedCompressor, outputStream); + if (unifiedCompressor == NO_UNIFIED_COMPRESSOR) { + outputStream.write(compressors); + byteLen += compressors.length; + } return byteLen; } @@ -348,7 +412,15 @@ public class VectorMeasurementSchema } vectorMeasurementSchema.encodings = encodings; - vectorMeasurementSchema.compressor = ReadWriteIOUtils.readByte(inputStream); + vectorMeasurementSchema.unifiedCompressor = ReadWriteIOUtils.readByte(inputStream); + if (vectorMeasurementSchema.unifiedCompressor == NO_UNIFIED_COMPRESSOR) { + byte[] compressors = new byte[measurementSize + 1]; + int read = inputStream.read(compressors); + if (read != measurementSize) { + throw new IOException("Unexpected end of stream when reading compressors"); + } + vectorMeasurementSchema.compressors = compressors; + } return vectorMeasurementSchema; } @@ -375,7 +447,12 @@ public class VectorMeasurementSchema } vectorMeasurementSchema.encodings = encodings; - vectorMeasurementSchema.compressor = ReadWriteIOUtils.readByte(buffer); + vectorMeasurementSchema.unifiedCompressor = ReadWriteIOUtils.readByte(buffer); + if (vectorMeasurementSchema.unifiedCompressor == NO_UNIFIED_COMPRESSOR) { + byte[] compressors = new byte[measurementSize + 1]; + buffer.get(compressors); + vectorMeasurementSchema.compressors = compressors; + } return vectorMeasurementSchema; } @@ -391,12 +468,13 @@ public class VectorMeasurementSchema return Arrays.equals(types, that.types) && Arrays.equals(encodings, that.encodings) && Objects.equals(deviceId, that.deviceId) - && Objects.equals(compressor, that.compressor); + && Objects.equals(unifiedCompressor, that.unifiedCompressor) + && Objects.equals(compressors, that.compressors); } @Override public int hashCode() { - return Objects.hash(deviceId, types, encodings, compressor); + return Objects.hash(deviceId, types, encodings, unifiedCompressor, compressors); } /** compare by vector name */ @@ -424,7 +502,14 @@ public class VectorMeasurementSchema TSEncoding.deserialize(encodings[entry.getValue()]).toString()); sc.addTail("],"); } - sc.addTail(CompressionType.deserialize(compressor).toString()); + if (unifiedCompressor != NO_UNIFIED_COMPRESSOR) { + sc.addTail(CompressionType.deserialize(unifiedCompressor).toString()); + } else { + for (byte compressor : compressors) { + sc.addTail(CompressionType.deserialize(compressor).toString()).addTail(","); + } + } + return sc.toString(); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java index b7d71972..cc59fd20 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java @@ -322,6 +322,10 @@ public class TsFileIOWriter implements AutoCloseable { */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public void endFile() throws IOException { + if (!canWrite) { + return; + } + checkInMemoryPathCount(); readChunkMetadataAndConstructIndexTree(); diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java new file mode 100644 index 00000000..30e36055 --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tsfile.write; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.header.ChunkHeader; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.read.common.BatchData; +import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.read.reader.IPageReader; +import org.apache.tsfile.read.reader.IPointReader; +import org.apache.tsfile.read.reader.chunk.AlignedChunkReader; +import org.apache.tsfile.read.reader.chunk.ChunkReader; +import org.apache.tsfile.read.reader.chunk.TableChunkReader; +import org.apache.tsfile.read.reader.page.AlignedPageReader; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.tsfile.write.chunk.TimeChunkWriter; +import org.apache.tsfile.write.chunk.ValueChunkWriter; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.schema.VectorMeasurementSchema; + +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class ChunkRewriteTest { + + @Test + public void AlignedChunkSinglePageTest() throws IOException { + String[] measurements = new String[] {"s1", "s2", "s3"}; + TSDataType[] types = new TSDataType[] {TSDataType.FLOAT, TSDataType.INT32, TSDataType.DOUBLE}; + VectorMeasurementSchema measurementSchema = + new VectorMeasurementSchema("root.sg.d1", measurements, types); + AlignedChunkWriterImpl chunkWriter = new AlignedChunkWriterImpl(measurementSchema); + + for (int time = 1; time <= 20; time++) { + chunkWriter.write(time, (float) time, false); + chunkWriter.write(time, time, false); + chunkWriter.write(time, (double) time, false); + chunkWriter.write(time); + } + chunkWriter.sealCurrentPage(); + + TimeChunkWriter timeChunkWriter = chunkWriter.getTimeChunkWriter(); + List<ValueChunkWriter> valueChunkWriters = chunkWriter.getValueChunkWriterList(); + + Chunk timeChunk = getTimeChunk(measurementSchema, timeChunkWriter); + + List<Chunk> valueChunks = getValueChunks(valueChunkWriters); + + AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, valueChunks); + List<IPageReader> pageReaders = chunkReader.loadPageReaderList(); + for (IPageReader page : pageReaders) { + IPointReader pointReader = ((AlignedPageReader) page).getLazyPointReader(); + int i = 1; + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + assertEquals((long) i, point.getTimestamp()); + assertEquals((float) i, point.getValue().getVector()[0].getValue()); + assertEquals(i, point.getValue().getVector()[1].getValue()); + assertEquals((double) i, point.getValue().getVector()[2].getValue()); + i++; + } + } + timeChunk.getData().flip(); + valueChunks.get(0).getData().flip(); + valueChunks.get(1).getData().flip(); + valueChunks.get(2).getData().flip(); + // rewrite INT32->DOUBLE + Chunk newValueChunk = valueChunks.get(1).rewrite(TSDataType.DOUBLE, timeChunk); + valueChunks.set(1, newValueChunk); + AlignedChunkReader newChunkReader = new AlignedChunkReader(timeChunk, valueChunks); + List<IPageReader> newPageReaders = newChunkReader.loadPageReaderList(); + for (IPageReader page : newPageReaders) { + IPointReader pointReader = ((AlignedPageReader) page).getLazyPointReader(); + int i = 1; + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + assertEquals((long) i, point.getTimestamp()); + assertEquals((float) i, point.getValue().getVector()[0].getValue()); + assertEquals((double) i, point.getValue().getVector()[1].getValue()); + assertEquals((double) i, point.getValue().getVector()[2].getValue()); + i++; + } + assertEquals(20, i - 1); + } + timeChunk.getData().flip(); + valueChunks.get(0).getData().flip(); + valueChunks.get(1).getData().flip(); + valueChunks.get(2).getData().flip(); + + // + + } + + @Test + public void AlignedChunkMultiPagesTest() throws IOException { + String[] measurements = new String[] {"s1", "s2", "s3"}; + TSDataType[] types = new TSDataType[] {TSDataType.FLOAT, TSDataType.INT32, TSDataType.DOUBLE}; + VectorMeasurementSchema measurementSchema = + new VectorMeasurementSchema("root.sg.d1", measurements, types); + AlignedChunkWriterImpl chunkWriter = new AlignedChunkWriterImpl(measurementSchema); + + for (int time = 1; time <= 20; time++) { + chunkWriter.write(time, (float) time, false); + chunkWriter.write(time, time, false); + chunkWriter.write(time, (double) time, false); + chunkWriter.write(time); + } + chunkWriter.sealCurrentPage(); + + for (int time = 21; time <= 40; time++) { + chunkWriter.write(time, (float) time, false); + chunkWriter.write(time, time, false); + chunkWriter.write(time, (double) time, false); + chunkWriter.write(time); + } + chunkWriter.sealCurrentPage(); + + TimeChunkWriter timeChunkWriter = chunkWriter.getTimeChunkWriter(); + List<ValueChunkWriter> valueChunkWriters = chunkWriter.getValueChunkWriterList(); + + Chunk timeChunk = getTimeChunk(measurementSchema, timeChunkWriter); + List<Chunk> valueChunks = getValueChunks(valueChunkWriters); + + AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, valueChunks); + List<IPageReader> pageReaders = chunkReader.loadPageReaderList(); + int i = 1; + for (IPageReader page : pageReaders) { + IPointReader pointReader = ((AlignedPageReader) page).getLazyPointReader(); + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + assertEquals((long) i, point.getTimestamp()); + assertEquals((float) i, point.getValue().getVector()[0].getValue()); + assertEquals(i, point.getValue().getVector()[1].getValue()); + assertEquals((double) i, point.getValue().getVector()[2].getValue()); + i++; + } + } + assertEquals(40, i - 1); + timeChunk.getData().flip(); + valueChunks.get(0).getData().flip(); + valueChunks.get(1).getData().flip(); + valueChunks.get(2).getData().flip(); + // rewrite INT32->DOUBLE + Chunk newValueChunk = valueChunks.get(1).rewrite(TSDataType.DOUBLE, timeChunk); + valueChunks.set(1, newValueChunk); + AlignedChunkReader newChunkReader = new AlignedChunkReader(timeChunk, valueChunks); + i = 1; + List<IPageReader> newPageReaders = newChunkReader.loadPageReaderList(); + for (IPageReader page : newPageReaders) { + IPointReader pointReader = ((AlignedPageReader) page).getLazyPointReader(); + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + assertEquals((long) i, point.getTimestamp()); + assertEquals((float) i, point.getValue().getVector()[0].getValue()); + assertEquals((double) i, point.getValue().getVector()[1].getValue()); + assertEquals((double) i, point.getValue().getVector()[2].getValue()); + i++; + } + } + assertEquals(40, i - 1); + timeChunk.getData().flip(); + valueChunks.get(0).getData().flip(); + valueChunks.get(1).getData().flip(); + valueChunks.get(2).getData().flip(); + } + + @Test + public void AlignedChunkWithNullTest() throws IOException { + String[] measurements = new String[] {"s1", "s2", "s3"}; + TSDataType[] types = new TSDataType[] {TSDataType.FLOAT, TSDataType.INT32, TSDataType.DOUBLE}; + VectorMeasurementSchema measurementSchema = + new VectorMeasurementSchema("root.sg.d1", measurements, types); + AlignedChunkWriterImpl chunkWriter = new AlignedChunkWriterImpl(measurementSchema); + + for (int time = 1; time <= 30; time = time + 3) { + chunkWriter.write(time, (float) time, false); + chunkWriter.write(time, time, false); + chunkWriter.write(time, (double) time, false); + chunkWriter.write(time); + + chunkWriter.write(time + 1, (float) (time + 1), true); + chunkWriter.write(time + 1, time + 1, false); + chunkWriter.write(time + 1, (double) (time + 1), true); + chunkWriter.write(time + 1); + + chunkWriter.write(time + 2, (float) (time + 1), true); + chunkWriter.write(time + 2, time + 1, true); + chunkWriter.write(time + 2, (double) (time + 1), true); + chunkWriter.write(time + 2); + } + chunkWriter.sealCurrentPage(); + + TimeChunkWriter timeChunkWriter = chunkWriter.getTimeChunkWriter(); + List<ValueChunkWriter> valueChunkWriters = chunkWriter.getValueChunkWriterList(); + + Chunk timeChunk = getTimeChunk(measurementSchema, timeChunkWriter); + List<Chunk> valueChunks = getValueChunks(valueChunkWriters); + + TableChunkReader chunkReader = new TableChunkReader(timeChunk, valueChunks, null); + List<IPageReader> pageReaders = chunkReader.loadPageReaderList(); + int i = 1; + for (IPageReader page : pageReaders) { + IPointReader pointReader = page.getAllSatisfiedPageData().getBatchDataIterator(); + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + if (i % 3 == 1) { + assertEquals((long) i, point.getTimestamp()); + assertEquals((float) i, point.getValue().getVector()[0].getValue()); + assertEquals(i, point.getValue().getVector()[1].getValue()); + assertEquals((double) i, point.getValue().getVector()[2].getValue()); + } else if (i % 3 == 2) { + assertEquals((long) i, point.getTimestamp()); + assertNull(point.getValue().getVector()[0]); + assertEquals(i, point.getValue().getVector()[1].getValue()); + assertNull(point.getValue().getVector()[2]); + } else { + assertEquals((long) i, point.getTimestamp()); + assertNull(point.getValue().getVector()[0]); + assertNull(point.getValue().getVector()[1]); + assertNull(point.getValue().getVector()[2]); + } + i++; + } + } + assertEquals(30, i - 1); + timeChunk.getData().flip(); + valueChunks.get(0).getData().flip(); + valueChunks.get(1).getData().flip(); + valueChunks.get(2).getData().flip(); + // rewrite INT32->DOUBLE + Chunk newValueChunk = valueChunks.get(1).rewrite(TSDataType.DOUBLE, timeChunk); + valueChunks.set(1, newValueChunk); + TableChunkReader newChunkReader = new TableChunkReader(timeChunk, valueChunks, null); + i = 1; + List<IPageReader> newPageReaders = newChunkReader.loadPageReaderList(); + for (IPageReader page : newPageReaders) { + IPointReader pointReader = page.getAllSatisfiedPageData().getBatchDataIterator(); + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + if (i % 3 == 1) { + assertEquals((long) i, point.getTimestamp()); + assertEquals((float) i, point.getValue().getVector()[0].getValue()); + assertEquals((double) i, point.getValue().getVector()[1].getValue()); + assertEquals((double) i, point.getValue().getVector()[2].getValue()); + } else if (i % 3 == 2) { + assertEquals((long) i, point.getTimestamp()); + assertNull(point.getValue().getVector()[0]); + assertEquals((double) i, point.getValue().getVector()[1].getValue()); + assertNull(point.getValue().getVector()[2]); + } else { + assertEquals((long) i, point.getTimestamp()); + assertNull(point.getValue().getVector()[0]); + assertNull(point.getValue().getVector()[1]); + assertNull(point.getValue().getVector()[2]); + } + i++; + } + } + assertEquals(30, i - 1); + timeChunk.getData().flip(); + valueChunks.get(0).getData().flip(); + valueChunks.get(1).getData().flip(); + valueChunks.get(2).getData().flip(); + } + + @Test + public void NonAlignedChunkMultiPagesTest() throws IOException { + IMeasurementSchema schema = new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.PLAIN); + ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema); + for (int time = 1; time <= 20; time++) { + chunkWriter.write(time, time); + } + chunkWriter.sealCurrentPage(); + for (int time = 21; time <= 40; time++) { + chunkWriter.write(time, time); + } + chunkWriter.sealCurrentPage(); + Chunk newChunk = getChunk(schema, chunkWriter); + ChunkReader chunkReader = new ChunkReader(newChunk); + List<IPageReader> pageReaders = chunkReader.loadPageReaderList(); + int i = 1; + for (IPageReader page : pageReaders) { + BatchData data = page.getAllSatisfiedPageData(true); + IPointReader pointReader = data.getBatchDataIterator(); + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + assertEquals((long) i, point.getTimestamp()); + assertEquals(i, point.getValue().getValue()); + i++; + } + } + assertEquals(40, i - 1); + newChunk.getData().flip(); + // rewrite INT32->DOUBLE + Chunk newChunk2 = newChunk.rewrite(TSDataType.DOUBLE); + ChunkReader chunkReader2 = new ChunkReader(newChunk2); + List<IPageReader> pageReaders2 = chunkReader2.loadPageReaderList(); + i = 1; + for (IPageReader page : pageReaders2) { + BatchData data = page.getAllSatisfiedPageData(true); + IPointReader pointReader = data.getBatchDataIterator(); + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + assertEquals((long) i, point.getTimestamp()); + assertEquals((double) i, point.getValue().getValue()); + i++; + } + } + } + + @Test + public void NonAlignedChunkSinglePageTest() throws IOException { + IMeasurementSchema schema = new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.PLAIN); + ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema); + for (int time = 1; time <= 20; time++) { + chunkWriter.write(time, time); + } + chunkWriter.sealCurrentPage(); + + Chunk newChunk = getChunk(schema, chunkWriter); + ChunkReader chunkReader = new ChunkReader(newChunk); + List<IPageReader> pageReaders = chunkReader.loadPageReaderList(); + for (IPageReader page : pageReaders) { + BatchData data = page.getAllSatisfiedPageData(true); + IPointReader pointReader = data.getBatchDataIterator(); + int i = 1; + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + assertEquals((long) i, point.getTimestamp()); + assertEquals(i, point.getValue().getValue()); + i++; + } + assertEquals(20, i - 1); + } + newChunk.getData().flip(); + // rewrite FLOAT->DOUBLE + Chunk newChunk2 = newChunk.rewrite(TSDataType.DOUBLE); + ChunkReader chunkReader2 = new ChunkReader(newChunk2); + List<IPageReader> pageReaders2 = chunkReader2.loadPageReaderList(); + for (IPageReader page : pageReaders2) { + BatchData data = page.getAllSatisfiedPageData(true); + IPointReader pointReader = data.getBatchDataIterator(); + int i = 1; + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + assertEquals((long) i, point.getTimestamp()); + assertEquals((double) i, point.getValue().getValue()); + i++; + } + } + } + + public Chunk getTimeChunk( + VectorMeasurementSchema measurementSchema, TimeChunkWriter timeChunkWriter) { + ByteBuffer newChunkData = timeChunkWriter.getByteBuffer(); + ChunkHeader newChunkHeader = + new ChunkHeader( + measurementSchema.getMeasurementName(), + newChunkData.capacity(), + TSDataType.VECTOR, + measurementSchema.getTimeCompressor(), + measurementSchema.getTimeTSEncoding(), + timeChunkWriter.getNumOfPages()); + return new Chunk(newChunkHeader, newChunkData, null, timeChunkWriter.getStatistics()); + } + + public List<Chunk> getValueChunks(List<ValueChunkWriter> valueChunkWriters) { + List<Chunk> valueChunks = new ArrayList<>(); + for (ValueChunkWriter valueChunkWriter : valueChunkWriters) { + ByteBuffer valueChunkData = valueChunkWriter.getByteBuffer(); + ChunkHeader valueChunkHeader = + new ChunkHeader( + valueChunkWriter.getMeasurementId(), + valueChunkData.capacity(), + valueChunkWriter.getDataType(), + valueChunkWriter.getCompressionType(), + valueChunkWriter.getEncodingType(), + valueChunkWriter.getNumOfPages()); + Chunk valueChunk = + new Chunk(valueChunkHeader, valueChunkData, null, valueChunkWriter.getStatistics()); + valueChunks.add(valueChunk); + } + return valueChunks; + } + + public Chunk getChunk(IMeasurementSchema schema, ChunkWriterImpl chunkWriter) { + ByteBuffer newChunkData = chunkWriter.getByteBuffer(); + ChunkHeader newChunkHeader = + new ChunkHeader( + schema.getMeasurementName(), + newChunkData.capacity(), + schema.getType(), + schema.getCompressor(), + schema.getEncodingType(), + chunkWriter.getNumOfPages()); + return new Chunk(newChunkHeader, newChunkData, null, chunkWriter.getStatistics()); + } +} diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java index dbda5319..7a385c0f 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java @@ -53,6 +53,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.junit.Assert.assertEquals; + public class TsFileIOWriterTest { private static final String FILE_PATH = @@ -103,9 +105,9 @@ public class TsFileIOWriterTest { TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH); // magic_string - Assert.assertEquals(TSFileConfig.MAGIC_STRING, reader.readHeadMagic()); - Assert.assertEquals(TSFileConfig.VERSION_NUMBER, reader.readVersionNumber()); - Assert.assertEquals(TSFileConfig.MAGIC_STRING, reader.readTailMagic()); + assertEquals(TSFileConfig.MAGIC_STRING, reader.readHeadMagic()); + assertEquals(TSFileConfig.VERSION_NUMBER, reader.readVersionNumber()); + assertEquals(TSFileConfig.MAGIC_STRING, reader.readTailMagic()); reader.position(TSFileConfig.MAGIC_STRING.getBytes().length + 1); @@ -113,39 +115,39 @@ public class TsFileIOWriterTest { ChunkGroupHeader chunkGroupHeader; for (int i = 0; i < CHUNK_GROUP_NUM; i++) { // chunk group header - Assert.assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker()); + assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker()); chunkGroupHeader = reader.readChunkGroupHeader(); - Assert.assertEquals(DEVICE_1, chunkGroupHeader.getDeviceID()); + assertEquals(DEVICE_1, chunkGroupHeader.getDeviceID()); // ordinary chunk header - Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, reader.readMarker()); + assertEquals(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, reader.readMarker()); header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER); - Assert.assertEquals(SENSOR_1, header.getMeasurementID()); + assertEquals(SENSOR_1, header.getMeasurementID()); } for (int i = 0; i < CHUNK_GROUP_NUM; i++) { // chunk group header - Assert.assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker()); + assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker()); chunkGroupHeader = reader.readChunkGroupHeader(); - Assert.assertEquals(DEVICE_2, chunkGroupHeader.getDeviceID()); + assertEquals(DEVICE_2, chunkGroupHeader.getDeviceID()); // vector chunk header (time) - Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER, reader.readMarker()); + assertEquals(MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER, reader.readMarker()); header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER); - Assert.assertEquals("", header.getMeasurementID()); + assertEquals("", header.getMeasurementID()); // vector chunk header (values) - Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, reader.readMarker()); + assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, reader.readMarker()); header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER); - Assert.assertEquals("s1", header.getMeasurementID()); - Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, reader.readMarker()); + assertEquals("s1", header.getMeasurementID()); + assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, reader.readMarker()); header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER); - Assert.assertEquals("s2", header.getMeasurementID()); + assertEquals("s2", header.getMeasurementID()); } - Assert.assertEquals(MetaMarker.OPERATION_INDEX_RANGE, reader.readMarker()); + assertEquals(MetaMarker.OPERATION_INDEX_RANGE, reader.readMarker()); reader.readPlanIndex(); - Assert.assertEquals(100, reader.getMinPlanIndex()); - Assert.assertEquals(10000, reader.getMaxPlanIndex()); + assertEquals(100, reader.getMinPlanIndex()); + assertEquals(10000, reader.getMaxPlanIndex()); - Assert.assertEquals(MetaMarker.SEPARATOR, reader.readMarker()); + assertEquals(MetaMarker.SEPARATOR, reader.readMarker()); // make sure timeseriesMetadata is only Map<IDeviceID, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = @@ -198,7 +200,7 @@ public class TsFileIOWriterTest { // vector chunk (time) writer.startFlushChunk( vectorMeasurementSchema.getMeasurementId(), - vectorMeasurementSchema.getCompressor(), + vectorMeasurementSchema.getTimeCompressor(), vectorMeasurementSchema.getType(), vectorMeasurementSchema.getTimeTSEncoding(), Statistics.getStatsByType(vectorMeasurementSchema.getType()), @@ -214,7 +216,7 @@ public class TsFileIOWriterTest { subStatistics.updateStats(0L, 0L); writer.startFlushChunk( vectorMeasurementSchema.getSubMeasurementsList().get(j), - vectorMeasurementSchema.getCompressor(), + vectorMeasurementSchema.getValueCompressor(j), vectorMeasurementSchema.getSubMeasurementsTSDataTypeList().get(j), vectorMeasurementSchema.getSubMeasurementsTSEncodingList().get(j), subStatistics,
