This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch IOTDB-4016 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 841d9e8880d5df396d94f3cf1fc3c111c2f866f5 Author: JackieTien97 <[email protected]> AuthorDate: Wed Aug 3 10:23:10 2022 +0800 Fix Calculating size for same TsBlock is not consistent between SinkHandle and SourceHandle --- .../operator/schema/SchemaFetchMergeOperator.java | 4 +- .../operator/schema/SchemaFetchScanOperator.java | 4 +- .../db/mpp/execution/ConfigExecutionTest.java | 3 +- .../block/column/BinaryArrayColumnEncoder.java | 26 ++-- .../block/column/ByteArrayColumnEncoder.java | 19 +-- .../block/column/Int32ArrayColumnEncoder.java | 37 +++--- .../block/column/Int64ArrayColumnEncoder.java | 64 ++++++---- .../tsfile/common/block/TsBlockSerdeTest.java | 139 ++++++++++++++++----- 8 files changed, 195 insertions(+), 101 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java index 416abb9c77..92b023eabb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java @@ -120,8 +120,6 @@ public class SchemaFetchMergeOperator implements ProcessOperator { return new TsBlock( new TimeColumn(1, new long[] {0}), new BinaryColumn( - 1, - Optional.of(new boolean[] {false}), - new Binary[] {new Binary(outputStream.toByteArray())})); + 1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())})); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java index e35ed7aba2..3a22065879 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java @@ -126,8 +126,6 @@ public class SchemaFetchScanOperator implements SourceOperator { new TsBlock( new TimeColumn(1, new long[] {0}), new BinaryColumn( - 1, - Optional.of(new boolean[] {false}), - new Binary[] {new Binary(outputStream.toByteArray())})); + 1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())})); } } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java index f43e702772..83930c083b 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java @@ -65,8 +65,7 @@ public class ConfigExecutionTest { public void normalConfigTaskWithResultTest() { TsBlock tsBlock = new TsBlock( - new TimeColumn(1, new long[] {0}), - new IntColumn(1, Optional.of(new boolean[] {false}), new int[] {1})); + new TimeColumn(1, new long[] {0}), new IntColumn(1, Optional.empty(), new int[] {1})); DatasetHeader datasetHeader = new DatasetHeader( Collections.singletonList(new ColumnHeader("TestValue", TSDataType.INT32)), false); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryArrayColumnEncoder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryArrayColumnEncoder.java index 715bfe7786..af707fb97c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryArrayColumnEncoder.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryArrayColumnEncoder.java @@ -44,24 +44,30 @@ public class BinaryArrayColumnEncoder implements ColumnEncoder { // | int32 | bytes | // +---------------+-------+ - boolean[] nullIndicators = ColumnEncoder.deserializeNullIndicators(input, positionCount); + if (!TSDataType.TEXT.equals(dataType)) { + throw new IllegalArgumentException("Invalid data type: " + dataType); + } - if (TSDataType.TEXT.equals(dataType)) { - ColumnBuilder columnBuilder = new BinaryColumnBuilder(null, positionCount); + boolean[] nullIndicators = ColumnEncoder.deserializeNullIndicators(input, positionCount); + Binary[] values = new Binary[positionCount]; + if (nullIndicators == null) { for (int i = 0; i < positionCount; i++) { - if (nullIndicators == null || !nullIndicators[i]) { + int length = input.getInt(); + byte[] value = new byte[length]; + input.get(value); + values[i] = new Binary(value); + } + } else { + for (int i = 0; i < positionCount; i++) { + if (!nullIndicators[i]) { int length = input.getInt(); byte[] value = new byte[length]; input.get(value); - columnBuilder.writeBinary(new Binary(value)); - } else { - columnBuilder.appendNull(); + values[i] = new Binary(value); } } - return columnBuilder.build(); - } else { - throw new IllegalArgumentException("Invalid data type: " + dataType); } + return new BinaryColumn(0, positionCount, nullIndicators, values); } @Override diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ByteArrayColumnEncoder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ByteArrayColumnEncoder.java index f4fac3a1f0..7636189a7b 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ByteArrayColumnEncoder.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ByteArrayColumnEncoder.java @@ -37,21 +37,14 @@ public class ByteArrayColumnEncoder implements ColumnEncoder { // | byte | list[byte] | list[byte] | // +---------------+-----------------+-------------+ - boolean[] nullIndicators = ColumnEncoder.deserializeNullIndicators(input, positionCount); - if (TSDataType.BOOLEAN.equals(dataType)) { - BooleanColumnBuilder columnBuilder = new BooleanColumnBuilder(null, positionCount); - boolean[] values = ColumnEncoder.deserializeBooleanArray(input, positionCount); - for (int i = 0; i < positionCount; i++) { - if (nullIndicators == null || !nullIndicators[i]) { - columnBuilder.writeBoolean(values[i]); - } else { - columnBuilder.appendNull(); - } - } - return columnBuilder.build(); - } else { + if (!TSDataType.BOOLEAN.equals(dataType)) { throw new IllegalArgumentException("Invalid data type: " + dataType); } + + boolean[] nullIndicators = ColumnEncoder.deserializeNullIndicators(input, positionCount); + boolean[] values = ColumnEncoder.deserializeBooleanArray(input, positionCount); + + return new BooleanColumn(0, positionCount, nullIndicators, values); } @Override diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Int32ArrayColumnEncoder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Int32ArrayColumnEncoder.java index 23339473fb..9abc4fca1e 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Int32ArrayColumnEncoder.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Int32ArrayColumnEncoder.java @@ -39,30 +39,37 @@ public class Int32ArrayColumnEncoder implements ColumnEncoder { boolean[] nullIndicators = ColumnEncoder.deserializeNullIndicators(input, positionCount); - ColumnBuilder columnBuilder; if (TSDataType.INT32.equals(dataType)) { - columnBuilder = new IntColumnBuilder(null, positionCount); - for (int i = 0; i < positionCount; i++) { - if (nullIndicators == null || !nullIndicators[i]) { - columnBuilder.writeInt(input.getInt()); - } else { - columnBuilder.appendNull(); + int[] values = new int[positionCount]; + if (nullIndicators == null) { + for (int i = 0; i < positionCount; i++) { + values[i] = input.getInt(); + } + } else { + for (int i = 0; i < positionCount; i++) { + if (!nullIndicators[i]) { + values[i] = input.getInt(); + } } } + return new IntColumn(0, positionCount, nullIndicators, values); } else if (TSDataType.FLOAT.equals(dataType)) { - columnBuilder = new FloatColumnBuilder(null, positionCount); - for (int i = 0; i < positionCount; i++) { - if (nullIndicators == null || !nullIndicators[i]) { - columnBuilder.writeFloat(Float.intBitsToFloat(input.getInt())); - } else { - columnBuilder.appendNull(); + float[] values = new float[positionCount]; + if (nullIndicators == null) { + for (int i = 0; i < positionCount; i++) { + values[i] = Float.intBitsToFloat(input.getInt()); + } + } else { + for (int i = 0; i < positionCount; i++) { + if (!nullIndicators[i]) { + values[i] = Float.intBitsToFloat(input.getInt()); + } } } + return new FloatColumn(0, positionCount, nullIndicators, values); } else { throw new IllegalArgumentException("Invalid data type: " + dataType); } - - return columnBuilder.build(); } @Override diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Int64ArrayColumnEncoder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Int64ArrayColumnEncoder.java index 9264564185..7af3790bc7 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Int64ArrayColumnEncoder.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Int64ArrayColumnEncoder.java @@ -29,23 +29,28 @@ public class Int64ArrayColumnEncoder implements ColumnEncoder { @Override public TimeColumn readTimeColumn(ByteBuffer input, int positionCount) { - return (TimeColumn) - readColumnInternal(new TimeColumnBuilder(null, positionCount), input, positionCount); - } - @Override - public Column readColumn(ByteBuffer input, TSDataType dataType, int positionCount) { - if (TSDataType.INT64.equals(dataType)) { - return readColumnInternal(new LongColumnBuilder(null, positionCount), input, positionCount); - } else if (TSDataType.DOUBLE.equals(dataType)) { - return readColumnInternal(new DoubleColumnBuilder(null, positionCount), input, positionCount); + // Serialized data layout: + // +---------------+-----------------+-------------+ + // | may have null | null indicators | values | + // +---------------+-----------------+-------------+ + // | byte | list[byte] | list[int64] | + // +---------------+-----------------+-------------+ + + boolean[] nullIndicators = ColumnEncoder.deserializeNullIndicators(input, positionCount); + long[] values = new long[positionCount]; + if (nullIndicators == null) { + for (int i = 0; i < positionCount; i++) { + values[i] = input.getLong(); + } + return new TimeColumn(0, positionCount, values); } else { - throw new IllegalArgumentException("Invalid data type: " + dataType); + throw new IllegalArgumentException("TimeColumn should not contain null values."); } } - private Column readColumnInternal( - ColumnBuilder columnBuilder, ByteBuffer input, int positionCount) { + @Override + public Column readColumn(ByteBuffer input, TSDataType dataType, int positionCount) { // Serialized data layout: // +---------------+-----------------+-------------+ @@ -55,27 +60,38 @@ public class Int64ArrayColumnEncoder implements ColumnEncoder { // +---------------+-----------------+-------------+ boolean[] nullIndicators = ColumnEncoder.deserializeNullIndicators(input, positionCount); - TSDataType dataType = columnBuilder.getDataType(); + if (TSDataType.INT64.equals(dataType)) { - for (int i = 0; i < positionCount; i++) { - if (nullIndicators == null || !nullIndicators[i]) { - columnBuilder.writeLong(input.getLong()); - } else { - columnBuilder.appendNull(); + long[] values = new long[positionCount]; + if (nullIndicators == null) { + for (int i = 0; i < positionCount; i++) { + values[i] = input.getLong(); + } + } else { + for (int i = 0; i < positionCount; i++) { + if (!nullIndicators[i]) { + values[i] = input.getLong(); + } } } + return new LongColumn(0, positionCount, nullIndicators, values); } else if (TSDataType.DOUBLE.equals(dataType)) { - for (int i = 0; i < positionCount; i++) { - if (nullIndicators == null || !nullIndicators[i]) { - columnBuilder.writeDouble(Double.longBitsToDouble(input.getLong())); - } else { - columnBuilder.appendNull(); + double[] values = new double[positionCount]; + if (nullIndicators == null) { + for (int i = 0; i < positionCount; i++) { + values[i] = Double.longBitsToDouble(input.getLong()); + } + } else { + for (int i = 0; i < positionCount; i++) { + if (!nullIndicators[i]) { + values[i] = Double.longBitsToDouble(input.getLong()); + } } } + return new DoubleColumn(0, positionCount, nullIndicators, values); } else { throw new IllegalArgumentException("Invalid data type: " + dataType); } - return columnBuilder.build(); } @Override diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockSerdeTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockSerdeTest.java index eb69736dbf..0ba21589fa 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockSerdeTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockSerdeTest.java @@ -22,18 +22,25 @@ package org.apache.iotdb.tsfile.common.block; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn; import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; import org.apache.iotdb.tsfile.read.common.block.column.ColumnEncoding; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde; import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class TsBlockSerdeTest { @Test @@ -72,41 +79,111 @@ public class TsBlockSerdeTest { ByteBuffer output = tsBlockSerde.serialize(tsBlockBuilder.build()); output.rewind(); int valueColumnCount = output.getInt(); - Assert.assertEquals(6, valueColumnCount); - Assert.assertEquals(TSDataType.INT32, TSDataType.deserialize(output.get())); - Assert.assertEquals(TSDataType.FLOAT, TSDataType.deserialize(output.get())); - Assert.assertEquals(TSDataType.INT64, TSDataType.deserialize(output.get())); - Assert.assertEquals(TSDataType.DOUBLE, TSDataType.deserialize(output.get())); - Assert.assertEquals(TSDataType.BOOLEAN, TSDataType.deserialize(output.get())); - Assert.assertEquals(TSDataType.TEXT, TSDataType.deserialize(output.get())); - Assert.assertEquals(positionCount, output.getInt()); - Assert.assertEquals(ColumnEncoding.INT64_ARRAY, ColumnEncoding.deserializeFrom(output)); - Assert.assertEquals(ColumnEncoding.INT32_ARRAY, ColumnEncoding.deserializeFrom(output)); - Assert.assertEquals(ColumnEncoding.INT32_ARRAY, ColumnEncoding.deserializeFrom(output)); - Assert.assertEquals(ColumnEncoding.INT64_ARRAY, ColumnEncoding.deserializeFrom(output)); - Assert.assertEquals(ColumnEncoding.INT64_ARRAY, ColumnEncoding.deserializeFrom(output)); - Assert.assertEquals(ColumnEncoding.BYTE_ARRAY, ColumnEncoding.deserializeFrom(output)); - Assert.assertEquals(ColumnEncoding.BINARY_ARRAY, ColumnEncoding.deserializeFrom(output)); + assertEquals(6, valueColumnCount); + assertEquals(TSDataType.INT32, TSDataType.deserialize(output.get())); + assertEquals(TSDataType.FLOAT, TSDataType.deserialize(output.get())); + assertEquals(TSDataType.INT64, TSDataType.deserialize(output.get())); + assertEquals(TSDataType.DOUBLE, TSDataType.deserialize(output.get())); + assertEquals(TSDataType.BOOLEAN, TSDataType.deserialize(output.get())); + assertEquals(TSDataType.TEXT, TSDataType.deserialize(output.get())); + assertEquals(positionCount, output.getInt()); + assertEquals(ColumnEncoding.INT64_ARRAY, ColumnEncoding.deserializeFrom(output)); + assertEquals(ColumnEncoding.INT32_ARRAY, ColumnEncoding.deserializeFrom(output)); + assertEquals(ColumnEncoding.INT32_ARRAY, ColumnEncoding.deserializeFrom(output)); + assertEquals(ColumnEncoding.INT64_ARRAY, ColumnEncoding.deserializeFrom(output)); + assertEquals(ColumnEncoding.INT64_ARRAY, ColumnEncoding.deserializeFrom(output)); + assertEquals(ColumnEncoding.BYTE_ARRAY, ColumnEncoding.deserializeFrom(output)); + assertEquals(ColumnEncoding.BINARY_ARRAY, ColumnEncoding.deserializeFrom(output)); output.rewind(); TsBlock tsBlock = tsBlockSerde.deserialize(output); - Assert.assertEquals(valueColumnCount, tsBlock.getValueColumnCount()); - Assert.assertEquals(TSDataType.INT32, tsBlock.getColumn(0).getDataType()); - Assert.assertEquals(TSDataType.FLOAT, tsBlock.getColumn(1).getDataType()); - Assert.assertEquals(TSDataType.INT64, tsBlock.getColumn(2).getDataType()); - Assert.assertEquals(TSDataType.DOUBLE, tsBlock.getColumn(3).getDataType()); - Assert.assertEquals(TSDataType.BOOLEAN, tsBlock.getColumn(4).getDataType()); - Assert.assertEquals(TSDataType.TEXT, tsBlock.getColumn(5).getDataType()); - Assert.assertEquals(positionCount, tsBlock.getPositionCount()); - Assert.assertEquals(ColumnEncoding.INT32_ARRAY, tsBlock.getColumn(0).getEncoding()); - Assert.assertEquals(ColumnEncoding.INT32_ARRAY, tsBlock.getColumn(1).getEncoding()); - Assert.assertEquals(ColumnEncoding.INT64_ARRAY, tsBlock.getColumn(2).getEncoding()); - Assert.assertEquals(ColumnEncoding.INT64_ARRAY, tsBlock.getColumn(3).getEncoding()); - Assert.assertEquals(ColumnEncoding.BYTE_ARRAY, tsBlock.getColumn(4).getEncoding()); - Assert.assertEquals(ColumnEncoding.BINARY_ARRAY, tsBlock.getColumn(5).getEncoding()); + assertEquals(valueColumnCount, tsBlock.getValueColumnCount()); + assertEquals(TSDataType.INT32, tsBlock.getColumn(0).getDataType()); + assertEquals(TSDataType.FLOAT, tsBlock.getColumn(1).getDataType()); + assertEquals(TSDataType.INT64, tsBlock.getColumn(2).getDataType()); + assertEquals(TSDataType.DOUBLE, tsBlock.getColumn(3).getDataType()); + assertEquals(TSDataType.BOOLEAN, tsBlock.getColumn(4).getDataType()); + assertEquals(TSDataType.TEXT, tsBlock.getColumn(5).getDataType()); + assertEquals(positionCount, tsBlock.getPositionCount()); + assertEquals(ColumnEncoding.INT32_ARRAY, tsBlock.getColumn(0).getEncoding()); + assertEquals(ColumnEncoding.INT32_ARRAY, tsBlock.getColumn(1).getEncoding()); + assertEquals(ColumnEncoding.INT64_ARRAY, tsBlock.getColumn(2).getEncoding()); + assertEquals(ColumnEncoding.INT64_ARRAY, tsBlock.getColumn(3).getEncoding()); + assertEquals(ColumnEncoding.BYTE_ARRAY, tsBlock.getColumn(4).getEncoding()); + assertEquals(ColumnEncoding.BINARY_ARRAY, tsBlock.getColumn(5).getEncoding()); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testSerializeAndDeserialize2() { + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try { + // to indicate this binary data is storage group info + ReadWriteIOUtils.write((byte) 0, outputStream); + + ReadWriteIOUtils.write(1, outputStream); + ReadWriteIOUtils.write("root.test.g_0", outputStream); + } catch (IOException e) { + // Totally memory operation. This case won't happen. + fail(e.getMessage()); + } + + TsBlock tsBlock = + new TsBlock( + new TimeColumn(1, new long[] {0}), + new BinaryColumn( + 1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())})); + + TsBlockSerde tsBlockSerde = new TsBlockSerde(); + try { + ByteBuffer output = tsBlockSerde.serialize(tsBlock); + output.rewind(); + + TsBlock deserializedTsBlock = tsBlockSerde.deserialize(output); + assertEquals(tsBlock.getRetainedSizeInBytes(), deserializedTsBlock.getRetainedSizeInBytes()); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testSerializeAndDeserialize3() { + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try { + // to indicate this binary data is storage group info + ReadWriteIOUtils.write((byte) 0, outputStream); + + ReadWriteIOUtils.write(1, outputStream); + ReadWriteIOUtils.write("root.test.g_0", outputStream); + } catch (IOException e) { + // Totally memory operation. This case won't happen. + fail(e.getMessage()); + } + + TsBlock tsBlock = + new TsBlock( + new TimeColumn(1, new long[] {0}), + new BinaryColumn( + 1, + Optional.of(new boolean[] {false}), + new Binary[] {new Binary(outputStream.toByteArray())})); + + TsBlockSerde tsBlockSerde = new TsBlockSerde(); + try { + ByteBuffer output = tsBlockSerde.serialize(tsBlock); + output.rewind(); + + TsBlock deserializedTsBlock = tsBlockSerde.deserialize(output); + assertEquals(tsBlock.getRetainedSizeInBytes(), deserializedTsBlock.getRetainedSizeInBytes()); } catch (IOException e) { e.printStackTrace(); - Assert.fail(); + fail(); } } }
