This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-2079 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit d24ab9c438039cf8c6ce7ed5b54b8d4c29897f25 Author: Jark Wu <[email protected]> AuthorDate: Mon Dec 22 23:32:15 2025 +0800 use the same row format in nested row (like, row<array<row>>) --- .../java/org/apache/fluss/row/BinaryArray.java | 50 +++++---- .../main/java/org/apache/fluss/row/BinaryRow.java | 19 ++++ .../org/apache/fluss/row/BinarySegmentUtils.java | 58 ++++------ .../java/org/apache/fluss/row/BinaryWriter.java | 22 +++- .../org/apache/fluss/row/aligned/AlignedRow.java | 6 +- .../org/apache/fluss/row/array/AlignedArray.java | 42 +++++++ .../org/apache/fluss/row/array/CompactedArray.java | 70 ++++++++++++ .../org/apache/fluss/row/array/IndexedArray.java | 69 ++++++++++++ .../fluss/row/array/PrimitiveBinaryArray.java | 43 ++++++++ .../row/arrow/vectors/ArrowRowColumnVector.java | 35 +----- .../fluss/row/compacted/CompactedKeyWriter.java | 4 +- .../fluss/row/compacted/CompactedRowReader.java | 23 ++-- .../apache/fluss/row/encode/AlignedRowEncoder.java | 69 ++++++++++++ .../fluss/row/encode/CompactedRowEncoder.java | 4 +- .../apache/fluss/row/encode/IndexedRowEncoder.java | 4 +- .../org/apache/fluss/row/indexed/IndexedRow.java | 37 +++++-- .../apache/fluss/row/indexed/IndexedRowReader.java | 21 ++-- .../apache/fluss/row/indexed/IndexedRowWriter.java | 105 ------------------ .../fluss/row/serializer/ArraySerializer.java | 25 ++++- .../apache/fluss/row/serializer/RowSerializer.java | 79 +++++++++----- .../java/org/apache/fluss/utils/ArrowUtils.java | 13 ++- .../java/org/apache/fluss/row/BinaryArrayTest.java | 121 +++++++-------------- .../apache/fluss/row/BinaryArrayWriterTest.java | 45 ++++---- .../org/apache/fluss/row/BinaryWriterTest.java | 104 +++++++++--------- .../org/apache/fluss/row/InternalArrayTest.java | 3 +- .../apache/fluss/row/TestInternalRowGenerator.java | 3 +- .../apache/fluss/row/aligned/AlignedRowTest.java | 3 +- .../compacted/CompactedRowDeserializerTest.java | 8 +- .../row/compacted/CompactedRowWriterTest.java | 3 +- .../fluss/row/indexed/IndexedRowReaderTest.java | 8 +- .../apache/fluss/row/indexed/IndexedRowTest.java | 9 +- 31 files changed, 675 insertions(+), 430 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java b/fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java index f2f3d4663..0aef1b0b6 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java @@ -19,6 +19,7 @@ package org.apache.fluss.row; import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.row.array.PrimitiveBinaryArray; import org.apache.fluss.types.DataType; import java.lang.reflect.Array; @@ -48,7 +49,7 @@ import static org.apache.fluss.utils.Preconditions.checkArgument; * @since 0.9 */ @PublicEvolving -public final class BinaryArray extends BinarySection +public abstract class BinaryArray extends BinarySection implements InternalArray, MemoryAwareGetters, DataSetters { private static final long serialVersionUID = 1L; @@ -111,9 +112,7 @@ public final class BinaryArray extends BinarySection /** The position to start storing array elements. */ private transient int elementOffset; - public BinaryArray() {} - - private void assertIndexIsValid(int ordinal) { + protected void assertIndexIsValid(int ordinal) { assert ordinal >= 0 : "ordinal (" + ordinal + ") should >= 0"; assert ordinal < size : "ordinal (" + ordinal + ") should < " + size; } @@ -265,16 +264,14 @@ public final class BinaryArray extends BinarySection @Override public InternalArray getArray(int pos) { assertIndexIsValid(pos); - return BinarySegmentUtils.readBinaryArray(segments, offset, getLong(pos)); + return BinarySegmentUtils.readBinaryArray( + segments, offset, getLong(pos), createNestedArrayInstance()); } - // TODO: getMap() will be added in Issue #1973 + /** Creates a nested {@link BinaryArray} with the nested data type information. */ + protected abstract BinaryArray createNestedArrayInstance(); - @Override - public InternalRow getRow(int pos, int numFields) { - assertIndexIsValid(pos); - return BinarySegmentUtils.readBinaryRow(segments, offset, numFields, getLong(pos)); - } + // TODO: getMap() will be added in Issue #1973 @Override public boolean getBoolean(int pos) { @@ -550,21 +547,26 @@ public final class BinaryArray extends BinarySection return values; } - public BinaryArray copy() { - return copy(new BinaryArray()); - } - - public BinaryArray copy(BinaryArray reuse) { - byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes); - reuse.pointTo(MemorySegment.wrap(bytes), 0, sizeInBytes); - return reuse; - } - @Override public int hashCode() { return BinarySegmentUtils.hash(segments, offset, sizeInBytes); } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + // override equals and only checks the other object is instance of BinaryArray + if (!(o instanceof BinaryArray)) { + return false; + } + final BinarySection that = (BinarySection) o; + return sizeInBytes == that.sizeInBytes + && BinarySegmentUtils.equals( + segments, offset, that.segments, that.offset, sizeInBytes); + } + // ------------------------------------------------------------------------------------------ // Construction Utilities // ------------------------------------------------------------------------------------------ @@ -616,13 +618,13 @@ public final class BinaryArray extends BinarySection UNSAFE.copyMemory( arr, offset, data, BYTE_ARRAY_BASE_OFFSET + headerInBytes, valueRegionInBytes); - BinaryArray result = new BinaryArray(); + BinaryArray result = new PrimitiveBinaryArray(); result.pointTo(MemorySegment.wrap(data), 0, (int) totalSize); return result; } public static BinaryArray fromLongArray(Long[] arr) { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, arr.length, 8); for (int i = 0; i < arr.length; i++) { Long v = arr[i]; @@ -641,7 +643,7 @@ public final class BinaryArray extends BinarySection return (BinaryArray) arr; } - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, arr.size(), 8); for (int i = 0; i < arr.size(); i++) { if (arr.isNullAt(i)) { diff --git a/fluss-common/src/main/java/org/apache/fluss/row/BinaryRow.java b/fluss-common/src/main/java/org/apache/fluss/row/BinaryRow.java index a7afaa8a3..2235a34a7 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/BinaryRow.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/BinaryRow.java @@ -18,6 +18,9 @@ package org.apache.fluss.row; import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.row.aligned.AlignedRow; +import org.apache.fluss.row.compacted.CompactedRow; +import org.apache.fluss.row.indexed.IndexedRow; /** * A binary format {@link InternalRow} that is backed on {@link MemorySegment} and supports all @@ -57,4 +60,20 @@ public interface BinaryRow extends InternalRow, MemoryAwareGetters { * @param sizeInBytes The size of the row. */ void pointTo(MemorySegment[] segments, int offset, int sizeInBytes); + + /** + * The binary row format types, it indicates the generated {@link BinaryRow} type by the {@link + * BinaryWriter}. + */ + enum BinaryRowFormat { + + /** Compacted binary row format, see {@link CompactedRow}. */ + COMPACTED, + + /** Aligned binary row format, see {@link AlignedRow}. */ + ALIGNED, + + /** Indexed binary row format, see {@link IndexedRow}. */ + INDEXED + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java b/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java index 41b86ff18..bd532634a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java @@ -20,6 +20,7 @@ package org.apache.fluss.row; import org.apache.fluss.annotation.Internal; import org.apache.fluss.memory.MemorySegment; import org.apache.fluss.memory.OutputView; +import org.apache.fluss.row.aligned.AlignedRow; import org.apache.fluss.row.compacted.CompactedRow; import org.apache.fluss.row.indexed.IndexedRow; import org.apache.fluss.types.DataType; @@ -989,14 +990,16 @@ public final class BinarySegmentUtils { return Decimal.fromUnscaledBytes(bytes, precision, scale); } - /** Gets an instance of {@link InternalArray} from underlying {@link MemorySegment}. */ + /** + * Read the array data into the reused {@link BinaryArray} instance from underlying {@link + * MemorySegment}. + */ public static BinaryArray readBinaryArray( - MemorySegment[] segments, int baseOffset, long offsetAndSize) { + MemorySegment[] segments, int baseOffset, long offsetAndSize, BinaryArray reusedArray) { final int size = ((int) offsetAndSize); int offset = (int) (offsetAndSize >> 32); - BinaryArray array = new BinaryArray(); - array.pointTo(segments, offset + baseOffset, size); - return array; + reusedArray.pointTo(segments, offset + baseOffset, size); + return reusedArray; } /** Read map data from segments. */ @@ -1013,48 +1016,33 @@ public final class BinarySegmentUtils { "Map type is not supported yet. Will be added in Issue #1973."); } - /** Gets an instance of {@link BinaryRow} from underlying {@link MemorySegment}. */ - public static BinaryRow readBinaryRow( - MemorySegment[] segments, int baseOffset, int numFields, long offsetAndSize) { + /** Read aligned row from segments. */ + public static InternalRow readAlignedRow( + MemorySegment[] segments, int baseOffset, long offsetAndSize, int numFields) { final int size = ((int) offsetAndSize); int offset = (int) (offsetAndSize >> 32); - org.apache.fluss.row.aligned.AlignedRow row = - new org.apache.fluss.row.aligned.AlignedRow(numFields); - row.pointTo(segments, offset + baseOffset, size); + AlignedRow row = new AlignedRow(numFields); + row.pointTo(segments, baseOffset + offset, size); return row; } - /** Read indexed row data from segments. */ + /** Read indexed row from segments. */ public static InternalRow readIndexedRow( - MemorySegment[] segments, - int offset, - int numBytes, - int numFields, - DataType[] fieldTypes) { - return readIndexedRowData(segments, offset, numBytes, numFields, fieldTypes); - } - - /** Read IndexedRow data from segments. */ - public static IndexedRow readIndexedRowData( - MemorySegment[] segments, - int offset, - int numBytes, - int numFields, - DataType[] fieldTypes) { + MemorySegment[] segments, int baseOffset, long offsetAndSize, DataType[] fieldTypes) { + final int size = ((int) offsetAndSize); + int offset = (int) (offsetAndSize >> 32); IndexedRow row = new IndexedRow(fieldTypes); - row.pointTo(segments, offset, numBytes); + row.pointTo(segments, baseOffset + offset, size); return row; } - /** Read compacted row data from segments. */ + /** Read compacted row from segments. */ public static InternalRow readCompactedRow( - MemorySegment[] segments, - int offset, - int numBytes, - int numFields, - DataType[] fieldTypes) { + MemorySegment[] segments, int baseOffset, long offsetAndSize, DataType[] fieldTypes) { + final int size = ((int) offsetAndSize); + int offset = (int) (offsetAndSize >> 32); CompactedRow row = new CompactedRow(fieldTypes); - row.pointTo(segments, offset, numBytes); + row.pointTo(segments, baseOffset + offset, size); return row; } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java index f59d57c98..5c542f92f 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java @@ -18,12 +18,15 @@ package org.apache.fluss.row; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.row.BinaryRow.BinaryRowFormat; import org.apache.fluss.row.serializer.ArraySerializer; import org.apache.fluss.row.serializer.RowSerializer; import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; +import javax.annotation.Nullable; + import java.io.Serializable; import static org.apache.fluss.types.DataTypeChecks.getLength; @@ -87,9 +90,12 @@ public interface BinaryWriter { * Creates an accessor for setting the elements of a binary writer during runtime. * * @param elementType the element type + * @param rowFormat the binary row format, it is required when the element type has nested row + * type, otherwise, {@link IllegalArgumentException} will be thrown. */ - static BinaryWriter.ValueWriter createValueWriter(DataType elementType) { - BinaryWriter.ValueWriter valueWriter = createNotNullValueWriter(elementType); + static BinaryWriter.ValueWriter createValueWriter( + DataType elementType, BinaryRowFormat rowFormat) { + BinaryWriter.ValueWriter valueWriter = createNotNullValueWriter(elementType, rowFormat); if (!elementType.isNullable()) { return valueWriter; } @@ -108,7 +114,8 @@ public interface BinaryWriter { * * @param elementType the element type */ - static BinaryWriter.ValueWriter createNotNullValueWriter(DataType elementType) { + private static BinaryWriter.ValueWriter createNotNullValueWriter( + DataType elementType, @Nullable BinaryRowFormat rowFormat) { switch (elementType.getTypeRoot()) { case CHAR: int charLength = getLength(elementType); @@ -152,7 +159,7 @@ public interface BinaryWriter { writer.writeTimestampLtz(pos, (TimestampLtz) value, timestampLtzPrecision); case ARRAY: final ArraySerializer arraySerializer = - new ArraySerializer(((ArrayType) elementType).getElementType()); + new ArraySerializer(((ArrayType) elementType).getElementType(), rowFormat); return (writer, pos, value) -> writer.writeArray(pos, (InternalArray) value, arraySerializer); @@ -161,9 +168,14 @@ public interface BinaryWriter { throw new UnsupportedOperationException( "Map type is not supported yet. Will be added in Issue #1973."); case ROW: + if (rowFormat == null) { + throw new IllegalArgumentException( + "Binary row format is required to write row."); + } final RowType rowType = (RowType) elementType; final RowSerializer rowSerializer = - new RowSerializer(rowType.getFieldTypes().toArray(new DataType[0])); + new RowSerializer( + rowType.getFieldTypes().toArray(new DataType[0]), rowFormat); return (writer, pos, value) -> writer.writeRow(pos, (InternalRow) value, rowSerializer); default: diff --git a/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java b/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java index 314ed4714..dad2b5822 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java @@ -30,6 +30,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.NullAwareGetters; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.array.AlignedArray; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DecimalType; import org.apache.fluss.types.LocalZonedTimestampType; @@ -390,7 +391,8 @@ public final class AlignedRow extends BinarySection assertIndexIsValid(pos); int fieldOffset = getFieldOffset(pos); final long offsetAndSize = segments[0].getLong(fieldOffset); - return BinarySegmentUtils.readBinaryArray(segments, offset, offsetAndSize); + return BinarySegmentUtils.readBinaryArray( + segments, offset, offsetAndSize, new AlignedArray()); } // TODO: getMap() will be added in Issue #1973 @@ -400,7 +402,7 @@ public final class AlignedRow extends BinarySection assertIndexIsValid(pos); int fieldOffset = getFieldOffset(pos); final long offsetAndSize = segments[0].getLong(fieldOffset); - return BinarySegmentUtils.readBinaryRow(segments, offset, numFields, offsetAndSize); + return BinarySegmentUtils.readAlignedRow(segments, offset, offsetAndSize, numFields); } /** The bit is 1 when the field is null. Default is 0. */ diff --git a/fluss-common/src/main/java/org/apache/fluss/row/array/AlignedArray.java b/fluss-common/src/main/java/org/apache/fluss/row/array/AlignedArray.java new file mode 100644 index 000000000..9919f46b3 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/array/AlignedArray.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.row.array; + +import org.apache.fluss.row.BinaryArray; +import org.apache.fluss.row.BinarySegmentUtils; +import org.apache.fluss.row.InternalRow; + +/** + * A {@link BinaryArray} that uses {@link org.apache.fluss.row.aligned.AlignedRow} as the binary + * format for arrays of nested row type. + */ +public class AlignedArray extends BinaryArray { + private static final long serialVersionUID = 1L; + + @Override + public InternalRow getRow(int pos, int numFields) { + return BinarySegmentUtils.readAlignedRow(segments, offset, getLong(pos), numFields); + } + + @Override + protected BinaryArray createNestedArrayInstance() { + return new AlignedArray(); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/array/CompactedArray.java b/fluss-common/src/main/java/org/apache/fluss/row/array/CompactedArray.java new file mode 100644 index 000000000..566d74cc0 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/array/CompactedArray.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.row.array; + +import org.apache.fluss.row.BinaryArray; +import org.apache.fluss.row.BinarySegmentUtils; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.ArrayType; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.RowType; + +/** + * A {@link BinaryArray} that uses {@link org.apache.fluss.row.compacted.CompactedRow} as the binary + * format for arrays of nested row type. + */ +public class CompactedArray extends BinaryArray { + + private final DataType elementType; + + private transient DataType[] nestedFields; + + public CompactedArray(DataType elementType) { + this.elementType = elementType; + } + + @Override + public InternalRow getRow(int pos, int numFields) { + assertIndexIsValid(pos); + if (elementType instanceof RowType) { + if (nestedFields == null) { + nestedFields = ((RowType) elementType).getFieldTypes().toArray(new DataType[0]); + } + if (nestedFields.length != numFields) { + throw new IllegalArgumentException( + "Unexpected number of fields " + numFields + " for " + elementType); + } + return BinarySegmentUtils.readCompactedRow( + segments, offset, getLong(pos), nestedFields); + } else { + throw new IllegalArgumentException("Can not get row from Array of type " + elementType); + } + } + + @Override + protected BinaryArray createNestedArrayInstance() { + if (elementType instanceof ArrayType) { + return new CompactedArray(((ArrayType) elementType).getElementType()); + } else { + throw new IllegalArgumentException( + "Can not get nested array from Array of type " + elementType); + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/array/IndexedArray.java b/fluss-common/src/main/java/org/apache/fluss/row/array/IndexedArray.java new file mode 100644 index 000000000..860da18fb --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/array/IndexedArray.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.row.array; + +import org.apache.fluss.row.BinaryArray; +import org.apache.fluss.row.BinarySegmentUtils; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.ArrayType; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.RowType; + +/** + * A {@link BinaryArray} that uses {@link org.apache.fluss.row.indexed.IndexedRow} as the binary + * format for arrays of nested row type. + */ +public class IndexedArray extends BinaryArray { + + private final DataType elementType; + + private transient DataType[] nestedFields; + + public IndexedArray(DataType elementType) { + this.elementType = elementType; + } + + @Override + public InternalRow getRow(int pos, int numFields) { + assertIndexIsValid(pos); + if (elementType instanceof RowType) { + if (nestedFields == null) { + nestedFields = ((RowType) elementType).getFieldTypes().toArray(new DataType[0]); + } + if (nestedFields.length != numFields) { + throw new IllegalArgumentException( + "Unexpected number of fields " + numFields + " for " + elementType); + } + return BinarySegmentUtils.readIndexedRow(segments, offset, getLong(pos), nestedFields); + } else { + throw new IllegalArgumentException("Can not get row from Array of type " + elementType); + } + } + + @Override + protected BinaryArray createNestedArrayInstance() { + if (elementType instanceof ArrayType) { + return new IndexedArray(((ArrayType) elementType).getElementType()); + } else { + throw new IllegalArgumentException( + "Can not get nested array from Array of type " + elementType); + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/array/PrimitiveBinaryArray.java b/fluss-common/src/main/java/org/apache/fluss/row/array/PrimitiveBinaryArray.java new file mode 100644 index 000000000..d91a0594d --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/array/PrimitiveBinaryArray.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.row.array; + +import org.apache.fluss.row.BinaryArray; +import org.apache.fluss.row.InternalRow; + +/** + * A BinaryArray implementation for primitive types (except complex types) which does not support + * getRow operation. + */ +public class PrimitiveBinaryArray extends BinaryArray { + private static final long serialVersionUID = 1L; + + @Override + public InternalRow getRow(int pos, int numFields) { + throw new IllegalArgumentException("Can not get nested row from array of primitive type."); + } + + @Override + protected BinaryArray createNestedArrayInstance() { + // this should never be called from a primitive array, + // however, we still return a placeholder + return new PrimitiveBinaryArray(); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowRowColumnVector.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowRowColumnVector.java index de5b4155f..0c325a53f 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowRowColumnVector.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowRowColumnVector.java @@ -19,53 +19,26 @@ package org.apache.fluss.row.arrow.vectors; import org.apache.fluss.row.InternalRow; -import org.apache.fluss.row.columnar.ColumnVector; import org.apache.fluss.row.columnar.ColumnarRow; import org.apache.fluss.row.columnar.RowColumnVector; import org.apache.fluss.row.columnar.VectorizedColumnBatch; -import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector; -import org.apache.fluss.types.RowType; -import org.apache.fluss.utils.ArrowUtils; - -import java.util.List; /** ArrowRowColumnVector is a wrapper class for Arrow RowVector. */ public class ArrowRowColumnVector implements RowColumnVector { - private boolean inited = false; - private final FieldVector vector; - private final RowType rowType; - private VectorizedColumnBatch vectorizedColumnBatch; + private final StructVector vector; + private final VectorizedColumnBatch vectorizedColumnBatch; - public ArrowRowColumnVector(FieldVector vector, RowType rowType) { + public ArrowRowColumnVector(StructVector vector, VectorizedColumnBatch columnBatch) { this.vector = vector; - this.rowType = rowType; - } - - private void init() { - if (!inited) { - List<FieldVector> children = ((StructVector) vector).getChildrenFromFields(); - ColumnVector[] vectors = new ColumnVector[children.size()]; - for (int i = 0; i < children.size(); i++) { - vectors[i] = - ArrowUtils.createArrowColumnVector(children.get(i), rowType.getTypeAt(i)); - } - this.vectorizedColumnBatch = new VectorizedColumnBatch(vectors); - inited = true; - } + this.vectorizedColumnBatch = columnBatch; } @Override public InternalRow getRow(int i) { - init(); return new ColumnarRow(vectorizedColumnBatch, i); } - public VectorizedColumnBatch getBatch() { - init(); - return vectorizedColumnBatch; - } - @Override public boolean isNullAt(int i) { return vector.isNull(i); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedKeyWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedKeyWriter.java index ba3e088c4..d960785e0 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedKeyWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedKeyWriter.java @@ -20,6 +20,8 @@ package org.apache.fluss.row.compacted; import org.apache.fluss.row.BinaryWriter; import org.apache.fluss.types.DataType; +import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.COMPACTED; + /** * A wrapping of {@link CompactedRowWriter} used to encode key columns. * @@ -35,7 +37,7 @@ public class CompactedKeyWriter extends CompactedRowWriter { } public static ValueWriter createValueWriter(DataType fieldType) { - ValueWriter valueWriter = BinaryWriter.createValueWriter(fieldType); + ValueWriter valueWriter = BinaryWriter.createValueWriter(fieldType, COMPACTED); return (writer, pos, value) -> { if (value == null) { throw new IllegalArgumentException( diff --git a/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java b/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java index dc3e774ad..ad5d1b9e0 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java @@ -25,6 +25,8 @@ import org.apache.fluss.row.InternalArray; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.array.CompactedArray; +import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; @@ -320,12 +322,14 @@ public class CompactedRowReader { fieldReader = (reader, pos) -> reader.readTimestampLtz(timestampLtzPrecision); break; case ARRAY: - fieldReader = (reader, pos) -> reader.readArray(); + DataType elementType = ((ArrayType) fieldType).getElementType(); + fieldReader = (reader, pos) -> reader.readArray(elementType); break; case ROW: - final int rowFieldCount = ((RowType) fieldType).getFieldCount(); - fieldReader = (reader, pos) -> reader.readRow(rowFieldCount); + DataType[] nestedFieldTypes = + ((RowType) fieldType).getFieldTypes().toArray(new DataType[0]); + fieldReader = (reader, pos) -> reader.readRow(nestedFieldTypes); break; default: throw new IllegalArgumentException("Unsupported type for IndexedRow: " + fieldType); @@ -341,18 +345,19 @@ public class CompactedRowReader { }; } - public InternalArray readArray() { + public InternalArray readArray(DataType elementType) { int length = readInt(); - InternalArray array = BinarySegmentUtils.readBinaryArray(segments, position, length); + InternalArray array = + BinarySegmentUtils.readBinaryArray( + segments, position, length, new CompactedArray(elementType)); position += length; return array; } - public InternalRow readRow(int numFields) { + public InternalRow readRow(DataType[] nestedFieldTypes) { int length = readInt(); - InternalRow row = - BinarySegmentUtils.readBinaryRow( - segments, 0, numFields, ((long) position << 32) | length); + CompactedRow row = new CompactedRow(nestedFieldTypes); + row.pointTo(segments, position, length); position += length; return row; } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/AlignedRowEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/AlignedRowEncoder.java new file mode 100644 index 000000000..0bee6f4c3 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/AlignedRowEncoder.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.row.encode; + +import org.apache.fluss.row.BinaryRow; +import org.apache.fluss.row.BinaryWriter; +import org.apache.fluss.row.aligned.AlignedRow; +import org.apache.fluss.row.aligned.AlignedRowWriter; +import org.apache.fluss.types.DataType; + +import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.ALIGNED; + +/** + * A {@link RowEncoder} for {@link AlignedRow}. + * + * @since 0.9 + */ +public class AlignedRowEncoder implements RowEncoder { + private final AlignedRow reuseRow; + private final AlignedRowWriter reuseWriter; + private final BinaryWriter.ValueWriter[] valueWriters; + + public AlignedRowEncoder(DataType[] fieldTypes) { + this.reuseRow = new AlignedRow(fieldTypes.length); + this.reuseWriter = new AlignedRowWriter(reuseRow); + this.valueWriters = new BinaryWriter.ValueWriter[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + valueWriters[i] = BinaryWriter.createValueWriter(fieldTypes[i], ALIGNED); + } + } + + @Override + public void startNewRow() { + reuseWriter.reset(); + } + + @Override + public void encodeField(int pos, Object value) { + valueWriters[pos].writeValue(reuseWriter, pos, value); + } + + @Override + public BinaryRow finishRow() { + reuseWriter.complete(); + return reuseRow; + } + + @Override + public void close() throws Exception { + // nothing to close + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/CompactedRowEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/CompactedRowEncoder.java index 76c04f777..546ce45a2 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/CompactedRowEncoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/CompactedRowEncoder.java @@ -24,6 +24,8 @@ import org.apache.fluss.row.compacted.CompactedRowDeserializer; import org.apache.fluss.row.compacted.CompactedRowWriter; import org.apache.fluss.types.DataType; +import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.COMPACTED; + /** * A {@link RowEncoder} for {@link CompactedRow}. * @@ -43,7 +45,7 @@ public class CompactedRowEncoder implements RowEncoder { writer = new CompactedRowWriter(fieldDataTypes.length); fieldWriters = new BinaryWriter.ValueWriter[fieldDataTypes.length]; for (int i = 0; i < fieldDataTypes.length; i++) { - fieldWriters[i] = BinaryWriter.createValueWriter(fieldDataTypes[i]); + fieldWriters[i] = BinaryWriter.createValueWriter(fieldDataTypes[i], COMPACTED); } this.compactedRowDeserializer = new CompactedRowDeserializer(fieldDataTypes); } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/IndexedRowEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/IndexedRowEncoder.java index 0bb552e5a..53fc9fac1 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/IndexedRowEncoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/IndexedRowEncoder.java @@ -24,6 +24,8 @@ import org.apache.fluss.row.indexed.IndexedRowWriter; import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; +import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.INDEXED; + /** * A {@link RowEncoder} for {@link IndexedRow}. * @@ -46,7 +48,7 @@ public class IndexedRowEncoder implements RowEncoder { this.fieldWriters = new BinaryWriter.ValueWriter[fieldDataTypes.length]; this.rowWriter = new IndexedRowWriter(fieldDataTypes); for (int i = 0; i < fieldDataTypes.length; i++) { - fieldWriters[i] = BinaryWriter.createValueWriter(fieldDataTypes[i]); + fieldWriters[i] = BinaryWriter.createValueWriter(fieldDataTypes[i], INDEXED); } } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRow.java b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRow.java index cf4fd4cb0..a42fe1639 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRow.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRow.java @@ -30,12 +30,14 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.NullAwareGetters; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.array.IndexedArray; +import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.BinaryType; import org.apache.fluss.types.CharType; import org.apache.fluss.types.DataType; -import org.apache.fluss.types.DataTypeRoot; import org.apache.fluss.types.DecimalType; import org.apache.fluss.types.IntType; +import org.apache.fluss.types.RowType; import org.apache.fluss.types.StringType; import org.apache.fluss.utils.MurmurHashUtils; @@ -237,7 +239,7 @@ public class IndexedRow implements BinaryRow, NullAwareGetters { BinaryWriter.ValueWriter[] writers = new BinaryWriter.ValueWriter[newType.length]; for (int i = 0; i < newType.length; i++) { fieldGetter[i] = InternalRow.createFieldGetter(newType[i], fields[i]); - writers[i] = BinaryWriter.createValueWriter(newType[i]); + writers[i] = BinaryWriter.createValueWriter(newType[i], BinaryRowFormat.INDEXED); } IndexedRow projectRow = new IndexedRow(newType); @@ -384,7 +386,15 @@ public class IndexedRow implements BinaryRow, NullAwareGetters { int offset = getFieldOffset(pos); int length = columnLengths[pos]; long offsetAndLength = ((long) offset << 32) | length; - return BinarySegmentUtils.readBinaryArray(segments, 0, offsetAndLength); + DataType fieldType = fieldTypes[pos]; + if (fieldType instanceof ArrayType) { + DataType elementType = ((ArrayType) fieldType).getElementType(); + return BinarySegmentUtils.readBinaryArray( + segments, 0, offsetAndLength, new IndexedArray(elementType)); + } else { + throw new IllegalStateException( + "Field type at position " + pos + " is not ArrayType: " + fieldType); + } } // TODO: getMap() will be added in Issue #1973 @@ -392,16 +402,19 @@ public class IndexedRow implements BinaryRow, NullAwareGetters { @Override public InternalRow getRow(int pos, int numFields) { assertIndexIsValid(pos); - int index = getFieldOffset(pos); + int offset = getFieldOffset(pos); int length = columnLengths[pos]; - long offsetAndLength = ((long) index << 32) | length; - return BinarySegmentUtils.readBinaryRow(segments, 0, numFields, offsetAndLength); - } - - private DataType[] getDataTypes(int pos) { - return (fieldTypes[pos].getTypeRoot() == DataTypeRoot.ROW) - ? fieldTypes[pos].getChildren().toArray(new DataType[0]) - : fieldTypes; + long offsetAndLength = ((long) offset << 32) | length; + DataType fieldType = fieldTypes[pos]; + if (fieldType instanceof RowType) { + DataType[] nestedFieldTypes = + ((RowType) fieldType).getFieldTypes().toArray(new DataType[0]); + return BinarySegmentUtils.readIndexedRow( + segments, 0, offsetAndLength, nestedFieldTypes); + } else { + throw new IllegalStateException( + "Field type at position " + pos + " is not RowType: " + fieldType); + } } private void assertIndexIsValid(int index) { diff --git a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java index 6bc8f9ac3..43f70cbfb 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java @@ -26,6 +26,8 @@ import org.apache.fluss.row.InternalArray; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.array.IndexedArray; +import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; @@ -202,18 +204,21 @@ public class IndexedRowReader { return Arrays.copyOfRange(bytes, 0, newLen); } - public InternalArray readArray() { + public InternalArray readArray(DataType elementType) { int length = readVarLengthFromVarLengthList(); MemorySegment[] segments = new MemorySegment[] {segment}; - InternalArray array = BinarySegmentUtils.readBinaryArray(segments, position, length); + InternalArray array = + BinarySegmentUtils.readBinaryArray( + segments, position, length, new IndexedArray(elementType)); position += length; return array; } - public InternalRow readRow(int numFields) { + public InternalRow readRow(DataType[] nestedFieldTypes) { int length = readVarLengthFromVarLengthList(); MemorySegment[] segments = new MemorySegment[] {segment}; - InternalRow row = BinarySegmentUtils.readBinaryRow(segments, position, numFields, length); + InternalRow row = + BinarySegmentUtils.readIndexedRow(segments, position, length, nestedFieldTypes); position += length; return row; } @@ -278,11 +283,13 @@ public class IndexedRowReader { fieldReader = (reader, pos) -> reader.readTimestampLtz(timestampLtzPrecision); break; case ARRAY: - fieldReader = (reader, pos) -> reader.readArray(); + DataType elementType = ((ArrayType) fieldType).getElementType(); + fieldReader = (reader, pos) -> reader.readArray(elementType); break; case ROW: - final int rowFieldCount = ((RowType) fieldType).getFieldCount(); - fieldReader = (reader, pos) -> reader.readRow(rowFieldCount); + DataType[] nestedFieldTypes = + ((RowType) fieldType).getFieldTypes().toArray(new DataType[0]); + fieldReader = (reader, pos) -> reader.readRow(nestedFieldTypes); break; case MAP: // TODO: Map type support will be added in Issue #1973 diff --git a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java index ec88bb142..12fb2f090 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java @@ -33,15 +33,12 @@ import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; import org.apache.fluss.row.serializer.ArraySerializer; import org.apache.fluss.row.serializer.RowSerializer; -import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.DataType; -import org.apache.fluss.types.DataTypeChecks; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.UnsafeUtils; import java.io.IOException; import java.io.OutputStream; -import java.io.Serializable; import java.util.Arrays; /** Writer for {@link IndexedRow}. */ @@ -326,103 +323,6 @@ public class IndexedRowWriter extends OutputStream // ------------------------------------------------------------------------------------------ - /** - * Creates an accessor for writing the elements of an indexed row writer during runtime. - * - * @param fieldType the field type of the indexed row - */ - public static FieldWriter createFieldWriter(DataType fieldType) { - final FieldWriter fieldWriter; - switch (fieldType.getTypeRoot()) { - case CHAR: - final int charLength = DataTypeChecks.getLength(fieldType); - fieldWriter = - (writer, pos, value) -> writer.writeChar((BinaryString) value, charLength); - break; - case STRING: - fieldWriter = (writer, pos, value) -> writer.writeString((BinaryString) value); - break; - case BOOLEAN: - fieldWriter = (writer, pos, value) -> writer.writeBoolean((boolean) value); - break; - case BINARY: - final int binaryLength = DataTypeChecks.getLength(fieldType); - fieldWriter = - (writer, pos, value) -> writer.writeBinary((byte[]) value, binaryLength); - break; - case BYTES: - fieldWriter = (writer, pos, value) -> writer.writeBytes((byte[]) value); - break; - case DECIMAL: - final int decimalPrecision = DataTypeChecks.getPrecision(fieldType); - fieldWriter = - (writer, pos, value) -> - writer.writeDecimal((Decimal) value, decimalPrecision); - break; - case TINYINT: - fieldWriter = (writer, pos, value) -> writer.writeByte((byte) value); - break; - case SMALLINT: - fieldWriter = (writer, pos, value) -> writer.writeShort((short) value); - break; - case INTEGER: - case DATE: - case TIME_WITHOUT_TIME_ZONE: - fieldWriter = (writer, pos, value) -> writer.writeInt((int) value); - break; - case BIGINT: - fieldWriter = (writer, pos, value) -> writer.writeLong((long) value); - break; - case FLOAT: - fieldWriter = (writer, pos, value) -> writer.writeFloat((float) value); - break; - case DOUBLE: - fieldWriter = (writer, pos, value) -> writer.writeDouble((double) value); - break; - case TIMESTAMP_WITHOUT_TIME_ZONE: - final int timestampNtzPrecision = DataTypeChecks.getPrecision(fieldType); - fieldWriter = - (writer, pos, value) -> - writer.writeTimestampNtz( - (TimestampNtz) value, timestampNtzPrecision); - break; - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - final int timestampLtzPrecision = DataTypeChecks.getPrecision(fieldType); - fieldWriter = - (writer, pos, value) -> - writer.writeTimestampLtz( - (TimestampLtz) value, timestampLtzPrecision); - break; - case ARRAY: - final ArraySerializer arraySerializer = - new ArraySerializer(((ArrayType) fieldType).getElementType()); - fieldWriter = - (writer, pos, value) -> - writer.writeArray((InternalArray) value, arraySerializer); - break; - case ROW: - final DataType[] rowFieldTypes = - ((RowType) fieldType).getFieldTypes().toArray(new DataType[0]); - final RowSerializer rowSerializer = new RowSerializer(rowFieldTypes); - fieldWriter = - (writer, pos, value) -> writer.writeRow((InternalRow) value, rowSerializer); - break; - default: - throw new IllegalArgumentException("Unsupported type for IndexedRow: " + fieldType); - } - - if (!fieldType.isNullable()) { - return fieldWriter; - } - return (writer, pos, value) -> { - if (value == null) { - writer.setNullAt(pos); - } else { - fieldWriter.writeField(writer, pos, value); - } - }; - } - public static void serializeIndexedRow(IndexedRow row, OutputView target) throws IOException { int sizeInBytes = row.getSizeInBytes(); if (target instanceof MemorySegmentWritable) { @@ -433,9 +333,4 @@ public class IndexedRowWriter extends OutputStream target.write(bytes, 0, sizeInBytes); } } - - /** Accessor for writing the elements of an indexed row writer during runtime. */ - public interface FieldWriter extends Serializable { - void writeField(IndexedRowWriter writer, int pos, Object value); - } } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/serializer/ArraySerializer.java b/fluss-common/src/main/java/org/apache/fluss/row/serializer/ArraySerializer.java index ecfdcab0d..1b24d6e5c 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/serializer/ArraySerializer.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/serializer/ArraySerializer.java @@ -19,9 +19,13 @@ package org.apache.fluss.row.serializer; import org.apache.fluss.row.BinaryArray; import org.apache.fluss.row.BinaryArrayWriter; +import org.apache.fluss.row.BinaryRow.BinaryRowFormat; import org.apache.fluss.row.BinaryWriter; import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.array.AlignedArray; +import org.apache.fluss.row.array.CompactedArray; +import org.apache.fluss.row.array.IndexedArray; import org.apache.fluss.types.DataType; import java.io.Serializable; @@ -31,11 +35,13 @@ public class ArraySerializer implements Serializable { private static final long serialVersionUID = 1L; private final DataType eleType; + private final BinaryRowFormat rowFormat; private transient BinaryArraySerializer alignedSerializer; - public ArraySerializer(DataType eleType) { + public ArraySerializer(DataType eleType, BinaryRowFormat rowFormat) { this.eleType = eleType; + this.rowFormat = rowFormat; } public BinaryArray toBinaryArray(InternalArray from) { @@ -45,6 +51,19 @@ public class ArraySerializer implements Serializable { return alignedSerializer.toAlignedArray(from); } + private BinaryArray createBinaryArrayInstance() { + switch (rowFormat) { + case COMPACTED: + return new CompactedArray(eleType); + case INDEXED: + return new IndexedArray(eleType); + case ALIGNED: + return new AlignedArray(); + default: + throw new IllegalArgumentException("Unsupported row format: " + rowFormat); + } + } + // ------------------------------------------------------------------------------------------ /** Serializer function for AlignedArray. */ @@ -86,7 +105,7 @@ public class ArraySerializer implements Serializable { int numElements = from.size(); if (reuseArray == null) { - reuseArray = new BinaryArray(); + reuseArray = createBinaryArrayInstance(); } if (reuseWriter == null || reuseWriter.getNumElements() != numElements) { reuseWriter = @@ -101,7 +120,7 @@ public class ArraySerializer implements Serializable { elementGetter = InternalArray.createElementGetter(eleType); } if (valueWriter == null) { - valueWriter = BinaryWriter.createValueWriter(eleType); + valueWriter = BinaryWriter.createValueWriter(eleType, rowFormat); } for (int i = 0; i < numElements; i++) { diff --git a/fluss-common/src/main/java/org/apache/fluss/row/serializer/RowSerializer.java b/fluss-common/src/main/java/org/apache/fluss/row/serializer/RowSerializer.java index 34d9f0185..5e2d02e60 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/serializer/RowSerializer.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/serializer/RowSerializer.java @@ -18,10 +18,12 @@ package org.apache.fluss.row.serializer; import org.apache.fluss.row.BinaryRow; -import org.apache.fluss.row.BinaryWriter; +import org.apache.fluss.row.BinaryRow.BinaryRowFormat; import org.apache.fluss.row.InternalRow; -import org.apache.fluss.row.aligned.AlignedRow; -import org.apache.fluss.row.aligned.AlignedRowWriter; +import org.apache.fluss.row.encode.AlignedRowEncoder; +import org.apache.fluss.row.encode.CompactedRowEncoder; +import org.apache.fluss.row.encode.IndexedRowEncoder; +import org.apache.fluss.row.encode.RowEncoder; import org.apache.fluss.types.DataType; import java.io.Serializable; @@ -31,44 +33,69 @@ public class RowSerializer implements Serializable { private static final long serialVersionUID = 1L; private final DataType[] fieldTypes; + private final BinaryRowFormat format; - private transient AlignedRow reuseRow; - private transient AlignedRowWriter reuseWriter; - private transient BinaryWriter.ValueWriter[] valueWriters; + private transient BinaryRowSerializer serializer; - public RowSerializer(DataType[] fieldTypes) { + public RowSerializer(DataType[] fieldTypes, BinaryRowFormat format) { this.fieldTypes = fieldTypes; + this.format = format; } + /** + * Serialize the given {@link InternalRow} into {@link BinaryRow}. + * + * <p>The returned {@link BinaryRow} might reuse the memory from the input {@link InternalRow} + * if it is already a {@link BinaryRow}. + * + * <p>Otherwise, it will serialize a {@link BinaryRow} based on the specified {@link + * BinaryRowFormat}. + */ public BinaryRow toBinaryRow(InternalRow from) { if (from instanceof BinaryRow) { return (BinaryRow) from; } - int numFields = from.getFieldCount(); - if (reuseRow == null || reuseRow.getFieldCount() != numFields) { - reuseRow = new AlignedRow(numFields); - reuseWriter = new AlignedRowWriter(reuseRow); - } else { - reuseWriter.reset(); + if (serializer == null) { + serializer = new BinaryRowSerializer(fieldTypes, format); } - if (valueWriters == null || valueWriters.length != numFields) { - valueWriters = new BinaryWriter.ValueWriter[numFields]; - for (int i = 0; i < numFields; i++) { - valueWriters[i] = BinaryWriter.createValueWriter(fieldTypes[i]); + return serializer.toBinaryRow(from); + } + + /** + * Serializer function for BinaryRow, it delegates the actual encoding to different {@link + * RowEncoder} based on the specified format. + */ + private static class BinaryRowSerializer { + private final RowEncoder rowEncoder; + private final InternalRow.FieldGetter[] fieldGetters; + + private BinaryRowSerializer(DataType[] fieldTypes, BinaryRowFormat format) { + this.fieldGetters = new InternalRow.FieldGetter[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + this.fieldGetters[i] = InternalRow.createFieldGetter(fieldTypes[i], i); + } + switch (format) { + case COMPACTED: + this.rowEncoder = new CompactedRowEncoder(fieldTypes); + break; + case INDEXED: + this.rowEncoder = new IndexedRowEncoder(fieldTypes); + break; + case ALIGNED: + this.rowEncoder = new AlignedRowEncoder(fieldTypes); + break; + default: + throw new IllegalArgumentException("Unsupported binary row format: " + format); } } - for (int i = 0; i < numFields; i++) { - if (from.isNullAt(i)) { - reuseWriter.setNullAt(i); - } else { - Object field = InternalRow.createFieldGetter(fieldTypes[i], i).getFieldOrNull(from); - valueWriters[i].writeValue(reuseWriter, i, field); + public BinaryRow toBinaryRow(InternalRow from) { + rowEncoder.startNewRow(); + for (int i = 0; i < fieldGetters.length; i++) { + rowEncoder.encodeField(i, fieldGetters[i].getFieldOrNull(from)); } + return rowEncoder.finishRow(); } - reuseWriter.complete(); - - return reuseRow; } } diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java index fb91c32d2..956ae045d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java @@ -58,6 +58,7 @@ import org.apache.fluss.row.arrow.writers.ArrowTinyIntWriter; import org.apache.fluss.row.arrow.writers.ArrowVarBinaryWriter; import org.apache.fluss.row.arrow.writers.ArrowVarCharWriter; import org.apache.fluss.row.columnar.ColumnVector; +import org.apache.fluss.row.columnar.VectorizedColumnBatch; import org.apache.fluss.shaded.arrow.com.google.flatbuffers.FlatBufferBuilder; import org.apache.fluss.shaded.arrow.org.apache.arrow.flatbuf.MessageHeader; import org.apache.fluss.shaded.arrow.org.apache.arrow.flatbuf.RecordBatch; @@ -355,7 +356,7 @@ public class ArrowUtils { } } - public static ColumnVector createArrowColumnVector(ValueVector vector, DataType dataType) { + private static ColumnVector createArrowColumnVector(ValueVector vector, DataType dataType) { if (vector instanceof TinyIntVector) { return new ArrowTinyIntColumnVector((TinyIntVector) vector); } else if (vector instanceof SmallIntVector) { @@ -401,7 +402,15 @@ public class ArrowUtils { } else if (vector instanceof StructVector && dataType instanceof RowType) { RowType rowType = (RowType) dataType; - return new ArrowRowColumnVector((FieldVector) vector, rowType); + StructVector structVector = (StructVector) vector; + List<FieldVector> fieldVectors = structVector.getChildrenFromFields(); + ColumnVector[] columnVectors = new ColumnVector[fieldVectors.size()]; + for (int i = 0; i < fieldVectors.size(); i++) { + columnVectors[i] = + ArrowUtils.createArrowColumnVector( + fieldVectors.get(i), rowType.getTypeAt(i)); + } + return new ArrowRowColumnVector(structVector, new VectorizedColumnBatch(columnVectors)); } else { throw new UnsupportedOperationException( String.format("Unsupported type %s.", dataType)); diff --git a/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java b/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java index 5f445c92a..14230b62e 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java @@ -17,6 +17,7 @@ package org.apache.fluss.row; +import org.apache.fluss.row.array.PrimitiveBinaryArray; import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.Test; @@ -103,7 +104,7 @@ public class BinaryArrayTest { @Test public void testWriteAndReadInt() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4); writer.writeInt(0, 10); @@ -119,7 +120,7 @@ public class BinaryArrayTest { @Test public void testWriteAndReadString() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); writer.writeString(0, BinaryString.fromString("hello")); @@ -133,7 +134,7 @@ public class BinaryArrayTest { @Test public void testSetNull() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 8); writer.writeLong(0, 100L); @@ -151,7 +152,7 @@ public class BinaryArrayTest { @Test public void testSetAndGetDecimal() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); Decimal decimal1 = Decimal.fromUnscaledLong(123, 5, 2); @@ -168,7 +169,7 @@ public class BinaryArrayTest { @Test public void testSetAndGetTimestampNtz() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); TimestampNtz ts1 = TimestampNtz.fromMillis(1000L); @@ -185,7 +186,7 @@ public class BinaryArrayTest { @Test public void testSetAndGetTimestampLtz() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); TimestampLtz ts1 = TimestampLtz.fromEpochMillis(1000L); @@ -202,7 +203,7 @@ public class BinaryArrayTest { @Test public void testSetAndGetBinary() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); byte[] binary1 = {1, 2, 3}; @@ -217,32 +218,6 @@ public class BinaryArrayTest { assertThat(array.getBinary(1, 3)).isEqualTo(binary2); } - @Test - public void testCopy() { - int[] intArray = {1, 2, 3, 4, 5}; - BinaryArray original = BinaryArray.fromPrimitiveArray(intArray); - - BinaryArray copied = original.copy(); - - assertThat(copied.size()).isEqualTo(original.size()); - assertThat(copied.getInt(0)).isEqualTo(1); - assertThat(copied.getInt(4)).isEqualTo(5); - } - - @Test - public void testCopyWithReuse() { - int[] intArray = {1, 2, 3}; - BinaryArray original = BinaryArray.fromPrimitiveArray(intArray); - - BinaryArray reuse = new BinaryArray(); - BinaryArray copied = original.copy(reuse); - - assertThat(copied).isSameAs(reuse); - assertThat(copied.size()).isEqualTo(3); - assertThat(copied.getInt(0)).isEqualTo(1); - assertThat(copied.getInt(2)).isEqualTo(3); - } - @Test public void testHashCode() { int[] intArray = {1, 2, 3}; @@ -285,7 +260,7 @@ public class BinaryArrayTest { @Test public void testAnyNull() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4); writer.writeInt(0, 10); @@ -304,7 +279,7 @@ public class BinaryArrayTest { @Test public void testToArrayWithNullThrowsException() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4); writer.writeInt(0, 10); @@ -327,7 +302,7 @@ public class BinaryArrayTest { @Test public void testToObjectArrayWithNull() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4); writer.writeInt(0, 10); @@ -341,7 +316,7 @@ public class BinaryArrayTest { @Test public void testGetChar() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); writer.writeString(0, BinaryString.fromString("hello")); @@ -354,7 +329,7 @@ public class BinaryArrayTest { @Test public void testSetBoolean() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 1); writer.writeBoolean(0, true); @@ -369,7 +344,7 @@ public class BinaryArrayTest { @Test public void testSetByte() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 1); writer.writeByte(0, (byte) 1); @@ -384,7 +359,7 @@ public class BinaryArrayTest { @Test public void testSetShort() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 2); writer.writeShort(0, (short) 10); @@ -399,7 +374,7 @@ public class BinaryArrayTest { @Test public void testSetFloat() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4); writer.writeFloat(0, 1.5f); @@ -414,7 +389,7 @@ public class BinaryArrayTest { @Test public void testSetDouble() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 8); writer.writeDouble(0, 1.1); @@ -454,7 +429,7 @@ public class BinaryArrayTest { int[] intArray = {1, 2, 3}; BinaryArray array1 = BinaryArray.fromPrimitiveArray(intArray); - BinaryArray array2 = new BinaryArray(); + BinaryArray array2 = new PrimitiveBinaryArray(); array2.pointTo(array1.getSegments(), array1.getOffset(), array1.getSizeInBytes()); assertThat(array2.size()).isEqualTo(3); @@ -465,7 +440,7 @@ public class BinaryArrayTest { @Test public void testHighPrecisionTimestamp() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); TimestampNtz ts1 = TimestampNtz.fromMillis(1000L, 123456); @@ -481,7 +456,7 @@ public class BinaryArrayTest { @Test public void testHighPrecisionTimestampLtz() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); TimestampLtz ts1 = TimestampLtz.fromEpochMillis(1000L, 123456); @@ -497,7 +472,7 @@ public class BinaryArrayTest { @Test public void testLargeDecimal() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); Decimal decimal1 = Decimal.fromBigDecimal(new java.math.BigDecimal("123.456"), 20, 3); @@ -513,7 +488,7 @@ public class BinaryArrayTest { @Test public void testSetNotNullAt() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 8); writer.setNullLong(0); @@ -529,7 +504,7 @@ public class BinaryArrayTest { @Test public void testSetNullAt() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 8); writer.writeLong(0, 100L); @@ -567,7 +542,7 @@ public class BinaryArrayTest { @Test public void testSetDecimalCompact() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); Decimal decimal1 = Decimal.fromUnscaledLong(123, 5, 2); @@ -586,7 +561,7 @@ public class BinaryArrayTest { @Test public void testSetTimestampNtzCompact() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); TimestampNtz ts1 = TimestampNtz.fromMillis(1000L); @@ -605,7 +580,7 @@ public class BinaryArrayTest { @Test public void testSetTimestampLtzCompact() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); TimestampLtz ts1 = TimestampLtz.fromEpochMillis(1000L); @@ -624,7 +599,7 @@ public class BinaryArrayTest { @Test public void testToObjectArrayWithNulls() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4); writer.writeInt(0, 100); @@ -683,7 +658,7 @@ public class BinaryArrayTest { @Test public void testSetNullLong() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); writer.setNullLong(0); writer.writeLong(1, 100L); @@ -695,7 +670,7 @@ public class BinaryArrayTest { @Test public void testSetNullInt() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4); writer.setNullInt(0); writer.writeInt(1, 100); @@ -707,7 +682,7 @@ public class BinaryArrayTest { @Test public void testSetNullFloat() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4); writer.setNullFloat(0); writer.writeFloat(1, 3.14f); @@ -719,7 +694,7 @@ public class BinaryArrayTest { @Test public void testSetNullDouble() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); writer.setNullDouble(0); writer.writeDouble(1, 3.14159); @@ -731,7 +706,7 @@ public class BinaryArrayTest { @Test public void testSetNullBoolean() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 1); writer.setNullBoolean(0); writer.writeBoolean(1, true); @@ -743,7 +718,7 @@ public class BinaryArrayTest { @Test public void testSetNullByte() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 1); writer.setNullByte(0); writer.writeByte(1, (byte) 42); @@ -755,7 +730,7 @@ public class BinaryArrayTest { @Test public void testSetNullShort() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 2); writer.setNullShort(0); writer.writeShort(1, (short) 123); @@ -767,7 +742,7 @@ public class BinaryArrayTest { @Test public void testGetString() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8); writer.writeString(0, BinaryString.fromString("test")); writer.complete(); @@ -777,7 +752,7 @@ public class BinaryArrayTest { @Test public void testGetBytes() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8); byte[] bytes = {1, 2, 3, 4, 5}; writer.writeBinary(0, bytes, 10); @@ -807,18 +782,6 @@ public class BinaryArrayTest { assertThat(size).isGreaterThan(0); } - @Test - public void testCopyWithMultipleTypes() { - BinaryArray array = BinaryArray.fromPrimitiveArray(new int[] {1, 2, 3, 4, 5}); - - BinaryArray copied = array.copy(); - - assertThat(copied).isNotSameAs(array); - assertThat(copied.size()).isEqualTo(5); - assertThat(copied.getInt(0)).isEqualTo(1); - assertThat(copied.getInt(4)).isEqualTo(5); - } - @Test public void testHashCodeConsistency() { BinaryArray array1 = BinaryArray.fromPrimitiveArray(new int[] {1, 2, 3}); @@ -829,7 +792,7 @@ public class BinaryArrayTest { @Test public void testSetDecimalNonCompact() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); Decimal largeDecimal = @@ -849,7 +812,7 @@ public class BinaryArrayTest { @Test public void testSetTimestampNtzNonCompact() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); TimestampNtz ts1 = TimestampNtz.fromMillis(1000L, 123456); @@ -868,7 +831,7 @@ public class BinaryArrayTest { @Test public void testSetTimestampLtzNonCompact() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); TimestampLtz ts1 = TimestampLtz.fromEpochMillis(1000L, 123456); @@ -919,7 +882,7 @@ public class BinaryArrayTest { @Test public void testGetBinaryWithLength() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8); byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; writer.writeBytes(0, data); @@ -931,7 +894,7 @@ public class BinaryArrayTest { @Test public void testGetDecimalNonCompact() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8); Decimal largeDecimal = @@ -945,7 +908,7 @@ public class BinaryArrayTest { @Test public void testGetTimestampNonCompact() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); TimestampNtz tsNtz = TimestampNtz.fromMillis(1000L, 123456); diff --git a/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayWriterTest.java b/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayWriterTest.java index 466346f98..a77367a47 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayWriterTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayWriterTest.java @@ -17,6 +17,7 @@ package org.apache.fluss.row; +import org.apache.fluss.row.array.PrimitiveBinaryArray; import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.Test; @@ -28,7 +29,7 @@ public class BinaryArrayWriterTest { @Test public void testWriteAndReadAllTypes() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 10, 8); writer.writeBoolean(0, true); @@ -57,7 +58,7 @@ public class BinaryArrayWriterTest { @Test public void testReset() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4); writer.writeInt(0, 10); @@ -80,7 +81,7 @@ public class BinaryArrayWriterTest { @Test public void testSetNullMethods() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 8, 8); writer.setNullBoolean(0); @@ -108,7 +109,7 @@ public class BinaryArrayWriterTest { @Test public void testSetOffsetAndSize() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); writer.setOffsetAndSize(0, 100, 200); @@ -126,7 +127,7 @@ public class BinaryArrayWriterTest { @Test public void testGetFieldOffset() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 5, 8); int fieldOffset0 = writer.getFieldOffset(0); @@ -139,7 +140,7 @@ public class BinaryArrayWriterTest { @Test public void testGetNumElements() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 5, 4); assertThat(writer.getNumElements()).isEqualTo(5); @@ -147,7 +148,7 @@ public class BinaryArrayWriterTest { @Test public void testWriteChar() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); writer.writeChar(0, BinaryString.fromString("hello"), 5); @@ -160,7 +161,7 @@ public class BinaryArrayWriterTest { @Test public void testWriteBinary() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); byte[] binary1 = {1, 2, 3, 4}; @@ -176,7 +177,7 @@ public class BinaryArrayWriterTest { @Test public void testWriteTimestampNtz() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); TimestampNtz ts1 = TimestampNtz.fromMillis(1000L, 123456); @@ -192,7 +193,7 @@ public class BinaryArrayWriterTest { @Test public void testWriteTimestampLtz() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); TimestampLtz ts1 = TimestampLtz.fromEpochMillis(1000L, 123456); @@ -208,7 +209,7 @@ public class BinaryArrayWriterTest { @Test public void testWriteNaNFloat() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4); writer.writeFloat(0, Float.NaN); @@ -223,7 +224,7 @@ public class BinaryArrayWriterTest { @Test public void testWriteNaNDouble() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 8); writer.writeDouble(0, Double.NaN); @@ -238,7 +239,7 @@ public class BinaryArrayWriterTest { @Test public void testAfterGrow() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); writer.writeLong(0, 100L); @@ -251,7 +252,7 @@ public class BinaryArrayWriterTest { @Test public void testCreateNullSetter() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 10, 8); BinaryArrayWriter.NullSetter booleanSetter = @@ -314,7 +315,7 @@ public class BinaryArrayWriterTest { @Test public void testWriteLargeString() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8); String largeString = new String(new char[100]).replace('\0', 'a'); @@ -326,7 +327,7 @@ public class BinaryArrayWriterTest { @Test public void testWriteSmallString() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8); String smallString = "ab"; @@ -338,7 +339,7 @@ public class BinaryArrayWriterTest { @Test public void testWriteLargeBinary() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8); byte[] largeBinary = new byte[100]; @@ -354,7 +355,7 @@ public class BinaryArrayWriterTest { @Test public void testSetNullBit() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 5, 4); writer.setNullBit(0); @@ -371,7 +372,7 @@ public class BinaryArrayWriterTest { @Test public void testMultipleResetCycles() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4); for (int cycle = 0; cycle < 5; cycle++) { @@ -390,7 +391,7 @@ public class BinaryArrayWriterTest { @Test public void testWriteNullDecimal() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8); writer.writeDecimal(0, null, 20); @@ -401,7 +402,7 @@ public class BinaryArrayWriterTest { @Test public void testWriteNullTimestampNtz() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8); writer.writeTimestampNtz(0, null, 9); @@ -412,7 +413,7 @@ public class BinaryArrayWriterTest { @Test public void testWriteNullTimestampLtz() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8); writer.writeTimestampLtz(0, null, 9); diff --git a/fluss-common/src/test/java/org/apache/fluss/row/BinaryWriterTest.java b/fluss-common/src/test/java/org/apache/fluss/row/BinaryWriterTest.java index d2501d6ca..d356eaaa8 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/BinaryWriterTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/BinaryWriterTest.java @@ -17,6 +17,8 @@ package org.apache.fluss.row; +import org.apache.fluss.row.array.PrimitiveBinaryArray; +import org.apache.fluss.types.DataType; import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.Test; @@ -29,28 +31,24 @@ public class BinaryWriterTest { @Test public void testCreateValueSetterForAllTypes() { - BinaryWriter.ValueWriter booleanSetter = - BinaryWriter.createValueWriter(DataTypes.BOOLEAN()); - BinaryWriter.ValueWriter tinyintSetter = - BinaryWriter.createValueWriter(DataTypes.TINYINT()); - BinaryWriter.ValueWriter smallintSetter = - BinaryWriter.createValueWriter(DataTypes.SMALLINT()); - BinaryWriter.ValueWriter intSetter = BinaryWriter.createValueWriter(DataTypes.INT()); - BinaryWriter.ValueWriter bigintSetter = BinaryWriter.createValueWriter(DataTypes.BIGINT()); - BinaryWriter.ValueWriter floatSetter = BinaryWriter.createValueWriter(DataTypes.FLOAT()); - BinaryWriter.ValueWriter doubleSetter = BinaryWriter.createValueWriter(DataTypes.DOUBLE()); - BinaryWriter.ValueWriter stringSetter = BinaryWriter.createValueWriter(DataTypes.STRING()); - BinaryWriter.ValueWriter charSetter = BinaryWriter.createValueWriter(DataTypes.CHAR(10)); - BinaryWriter.ValueWriter binarySetter = - BinaryWriter.createValueWriter(DataTypes.BINARY(10)); + BinaryWriter.ValueWriter booleanSetter = createPrimitiveValueWriter(DataTypes.BOOLEAN()); + BinaryWriter.ValueWriter tinyintSetter = createPrimitiveValueWriter(DataTypes.TINYINT()); + BinaryWriter.ValueWriter smallintSetter = createPrimitiveValueWriter(DataTypes.SMALLINT()); + BinaryWriter.ValueWriter intSetter = createPrimitiveValueWriter(DataTypes.INT()); + BinaryWriter.ValueWriter bigintSetter = createPrimitiveValueWriter(DataTypes.BIGINT()); + BinaryWriter.ValueWriter floatSetter = createPrimitiveValueWriter(DataTypes.FLOAT()); + BinaryWriter.ValueWriter doubleSetter = createPrimitiveValueWriter(DataTypes.DOUBLE()); + BinaryWriter.ValueWriter stringSetter = createPrimitiveValueWriter(DataTypes.STRING()); + BinaryWriter.ValueWriter charSetter = createPrimitiveValueWriter(DataTypes.CHAR(10)); + BinaryWriter.ValueWriter binarySetter = createPrimitiveValueWriter(DataTypes.BINARY(10)); BinaryWriter.ValueWriter decimalSetter = - BinaryWriter.createValueWriter(DataTypes.DECIMAL(5, 2)); + createPrimitiveValueWriter(DataTypes.DECIMAL(5, 2)); BinaryWriter.ValueWriter timestampNtzSetter = - BinaryWriter.createValueWriter(DataTypes.TIMESTAMP(3)); + createPrimitiveValueWriter(DataTypes.TIMESTAMP(3)); BinaryWriter.ValueWriter timestampLtzSetter = - BinaryWriter.createValueWriter(DataTypes.TIMESTAMP_LTZ(3)); - BinaryWriter.ValueWriter dateSetter = BinaryWriter.createValueWriter(DataTypes.DATE()); - BinaryWriter.ValueWriter timeSetter = BinaryWriter.createValueWriter(DataTypes.TIME()); + createPrimitiveValueWriter(DataTypes.TIMESTAMP_LTZ(3)); + BinaryWriter.ValueWriter dateSetter = createPrimitiveValueWriter(DataTypes.DATE()); + BinaryWriter.ValueWriter timeSetter = createPrimitiveValueWriter(DataTypes.TIME()); assertThat(booleanSetter).isNotNull(); assertThat(tinyintSetter).isNotNull(); @@ -71,10 +69,10 @@ public class BinaryWriterTest { @Test public void testValueSetterWithBooleanType() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 1); - BinaryWriter.ValueWriter setter = BinaryWriter.createValueWriter(DataTypes.BOOLEAN()); + BinaryWriter.ValueWriter setter = createPrimitiveValueWriter(DataTypes.BOOLEAN()); setter.writeValue(writer, 0, true); setter.writeValue(writer, 1, false); writer.complete(); @@ -85,10 +83,10 @@ public class BinaryWriterTest { @Test public void testValueSetterWithIntType() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4); - BinaryWriter.ValueWriter setter = BinaryWriter.createValueWriter(DataTypes.INT()); + BinaryWriter.ValueWriter setter = createPrimitiveValueWriter(DataTypes.INT()); setter.writeValue(writer, 0, 100); setter.writeValue(writer, 1, 200); writer.complete(); @@ -99,10 +97,10 @@ public class BinaryWriterTest { @Test public void testValueSetterWithStringType() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); - BinaryWriter.ValueWriter setter = BinaryWriter.createValueWriter(DataTypes.STRING()); + BinaryWriter.ValueWriter setter = createPrimitiveValueWriter(DataTypes.STRING()); setter.writeValue(writer, 0, BinaryString.fromString("hello")); setter.writeValue(writer, 1, BinaryString.fromString("world")); writer.complete(); @@ -113,10 +111,10 @@ public class BinaryWriterTest { @Test public void testValueSetterWithDecimalType() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); - BinaryWriter.ValueWriter setter = BinaryWriter.createValueWriter(DataTypes.DECIMAL(5, 2)); + BinaryWriter.ValueWriter setter = createPrimitiveValueWriter(DataTypes.DECIMAL(5, 2)); setter.writeValue(writer, 0, Decimal.fromUnscaledLong(123, 5, 2)); setter.writeValue(writer, 1, Decimal.fromUnscaledLong(456, 5, 2)); writer.complete(); @@ -129,7 +127,7 @@ public class BinaryWriterTest { public void testCreateValueSetterForMapThrowsException() { assertThatThrownBy( () -> - BinaryWriter.createValueWriter( + createPrimitiveValueWriter( DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()))) .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining("Map type is not supported yet"); @@ -137,10 +135,10 @@ public class BinaryWriterTest { @Test public void testValueSetterWithByteType() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 1); - BinaryWriter.ValueWriter setter = BinaryWriter.createValueWriter(DataTypes.TINYINT()); + BinaryWriter.ValueWriter setter = createPrimitiveValueWriter(DataTypes.TINYINT()); setter.writeValue(writer, 0, (byte) 10); setter.writeValue(writer, 1, (byte) 20); writer.complete(); @@ -151,10 +149,10 @@ public class BinaryWriterTest { @Test public void testValueSetterWithShortType() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 2); - BinaryWriter.ValueWriter setter = BinaryWriter.createValueWriter(DataTypes.SMALLINT()); + BinaryWriter.ValueWriter setter = createPrimitiveValueWriter(DataTypes.SMALLINT()); setter.writeValue(writer, 0, (short) 100); setter.writeValue(writer, 1, (short) 200); writer.complete(); @@ -165,10 +163,10 @@ public class BinaryWriterTest { @Test public void testValueSetterWithLongType() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); - BinaryWriter.ValueWriter setter = BinaryWriter.createValueWriter(DataTypes.BIGINT()); + BinaryWriter.ValueWriter setter = createPrimitiveValueWriter(DataTypes.BIGINT()); setter.writeValue(writer, 0, 1000L); setter.writeValue(writer, 1, 2000L); writer.complete(); @@ -179,10 +177,10 @@ public class BinaryWriterTest { @Test public void testValueSetterWithFloatType() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4); - BinaryWriter.ValueWriter setter = BinaryWriter.createValueWriter(DataTypes.FLOAT()); + BinaryWriter.ValueWriter setter = createPrimitiveValueWriter(DataTypes.FLOAT()); setter.writeValue(writer, 0, 1.5f); setter.writeValue(writer, 1, 2.5f); writer.complete(); @@ -193,10 +191,10 @@ public class BinaryWriterTest { @Test public void testValueSetterWithDoubleType() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); - BinaryWriter.ValueWriter setter = BinaryWriter.createValueWriter(DataTypes.DOUBLE()); + BinaryWriter.ValueWriter setter = createPrimitiveValueWriter(DataTypes.DOUBLE()); setter.writeValue(writer, 0, 1.1); setter.writeValue(writer, 1, 2.2); writer.complete(); @@ -207,10 +205,10 @@ public class BinaryWriterTest { @Test public void testValueSetterWithCharType() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); - BinaryWriter.ValueWriter setter = BinaryWriter.createValueWriter(DataTypes.CHAR(5)); + BinaryWriter.ValueWriter setter = createPrimitiveValueWriter(DataTypes.CHAR(5)); setter.writeValue(writer, 0, BinaryString.fromString("hello")); setter.writeValue(writer, 1, BinaryString.fromString("world")); writer.complete(); @@ -221,10 +219,10 @@ public class BinaryWriterTest { @Test public void testValueSetterWithBinaryType() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); - BinaryWriter.ValueWriter setter = BinaryWriter.createValueWriter(DataTypes.BINARY(3)); + BinaryWriter.ValueWriter setter = createPrimitiveValueWriter(DataTypes.BINARY(3)); setter.writeValue(writer, 0, new byte[] {1, 2, 3}); setter.writeValue(writer, 1, new byte[] {4, 5, 6}); writer.complete(); @@ -235,10 +233,10 @@ public class BinaryWriterTest { @Test public void testValueSetterWithTimestampNtzType() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); - BinaryWriter.ValueWriter setter = BinaryWriter.createValueWriter(DataTypes.TIMESTAMP(3)); + BinaryWriter.ValueWriter setter = createPrimitiveValueWriter(DataTypes.TIMESTAMP(3)); setter.writeValue(writer, 0, TimestampNtz.fromMillis(1000L)); setter.writeValue(writer, 1, TimestampNtz.fromMillis(2000L)); writer.complete(); @@ -249,11 +247,10 @@ public class BinaryWriterTest { @Test public void testValueSetterWithTimestampLtzType() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); - BinaryWriter.ValueWriter setter = - BinaryWriter.createValueWriter(DataTypes.TIMESTAMP_LTZ(3)); + BinaryWriter.ValueWriter setter = createPrimitiveValueWriter(DataTypes.TIMESTAMP_LTZ(3)); setter.writeValue(writer, 0, TimestampLtz.fromEpochMillis(1000L)); setter.writeValue(writer, 1, TimestampLtz.fromEpochMillis(2000L)); writer.complete(); @@ -264,10 +261,10 @@ public class BinaryWriterTest { @Test public void testValueSetterWithDateType() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4); - BinaryWriter.ValueWriter setter = BinaryWriter.createValueWriter(DataTypes.DATE()); + BinaryWriter.ValueWriter setter = createPrimitiveValueWriter(DataTypes.DATE()); setter.writeValue(writer, 0, 18000); setter.writeValue(writer, 1, 18001); writer.complete(); @@ -278,10 +275,10 @@ public class BinaryWriterTest { @Test public void testValueSetterWithTimeType() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4); - BinaryWriter.ValueWriter setter = BinaryWriter.createValueWriter(DataTypes.TIME()); + BinaryWriter.ValueWriter setter = createPrimitiveValueWriter(DataTypes.TIME()); setter.writeValue(writer, 0, 3600000); setter.writeValue(writer, 1, 7200000); writer.complete(); @@ -289,4 +286,9 @@ public class BinaryWriterTest { assertThat(array.getInt(0)).isEqualTo(3600000); assertThat(array.getInt(1)).isEqualTo(7200000); } + + private static BinaryWriter.ValueWriter createPrimitiveValueWriter(DataType elementType) { + // use null for row format if there is no nested row type + return BinaryWriter.createValueWriter(elementType, null); + } } diff --git a/fluss-common/src/test/java/org/apache/fluss/row/InternalArrayTest.java b/fluss-common/src/test/java/org/apache/fluss/row/InternalArrayTest.java index b98ab932b..95e803b20 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/InternalArrayTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/InternalArrayTest.java @@ -17,6 +17,7 @@ package org.apache.fluss.row; +import org.apache.fluss.row.array.PrimitiveBinaryArray; import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.Test; @@ -233,7 +234,7 @@ public class InternalArrayTest { @Test public void testBinaryArrayWithNullElementGetter() { - BinaryArray array = new BinaryArray(); + BinaryArray array = new PrimitiveBinaryArray(); BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4); writer.writeInt(0, 10); diff --git a/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java b/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java index f1c0a3bfe..d6d1b6a15 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java @@ -32,6 +32,7 @@ import java.time.LocalDate; import java.time.LocalTime; import java.util.Random; +import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.INDEXED; import static org.apache.fluss.row.BinaryString.fromString; /** Test all types and generate test internal row. */ @@ -78,7 +79,7 @@ public class TestInternalRowGenerator { BinaryWriter.ValueWriter[] writers = new BinaryWriter.ValueWriter[dataTypes.length]; for (int i = 0; i < dataTypes.length; i++) { - writers[i] = BinaryWriter.createValueWriter(dataTypes[i]); + writers[i] = BinaryWriter.createValueWriter(dataTypes[i], INDEXED); } Random rnd = new Random(); diff --git a/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java b/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java index 6836bcb60..519fa2fd8 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java @@ -38,6 +38,7 @@ import java.util.HashSet; import java.util.Random; import java.util.Set; +import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.ALIGNED; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link AlignedRow}. */ @@ -546,7 +547,7 @@ class AlignedRowTest { AlignedRowWriter writer = new AlignedRowWriter(row); BinaryWriter.ValueWriter[] fieldSetters = new BinaryWriter.ValueWriter[10]; for (int i = 0; i < fieldTypes.length; i++) { - fieldSetters[i] = BinaryWriter.createValueWriter(fieldTypes[i]); + fieldSetters[i] = BinaryWriter.createValueWriter(fieldTypes[i], ALIGNED); } // Test static write method for different data types diff --git a/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowDeserializerTest.java b/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowDeserializerTest.java index c280c8ac8..20bfb633d 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowDeserializerTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowDeserializerTest.java @@ -25,6 +25,7 @@ import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalArray; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.array.PrimitiveBinaryArray; import org.apache.fluss.row.serializer.ArraySerializer; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DataTypes; @@ -34,6 +35,7 @@ import org.junit.jupiter.api.Test; import java.math.BigDecimal; +import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.COMPACTED; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link CompactedRowDeserializer}. */ @@ -923,7 +925,7 @@ public class CompactedRowDeserializerTest { CompactedRowDeserializer deserializer = new CompactedRowDeserializer(types); BinaryArray intArray = BinaryArray.fromPrimitiveArray(new int[] {1, 2, 3, 4, 5}); - ArraySerializer arraySerializer = new ArraySerializer(DataTypes.INT()); + ArraySerializer arraySerializer = new ArraySerializer(DataTypes.INT(), COMPACTED); CompactedRowWriter writer = new CompactedRowWriter(types.length); writer.writeInt(100); @@ -948,14 +950,14 @@ public class CompactedRowDeserializerTest { DataType[] types = {DataTypes.ARRAY(DataTypes.STRING())}; CompactedRowDeserializer deserializer = new CompactedRowDeserializer(types); - BinaryArray strArray = new BinaryArray(); + BinaryArray strArray = new PrimitiveBinaryArray(); BinaryArrayWriter strArrayWriter = new BinaryArrayWriter(strArray, 3, 8); strArrayWriter.writeString(0, BinaryString.fromString("hello")); strArrayWriter.writeString(1, BinaryString.fromString("world")); strArrayWriter.writeString(2, BinaryString.fromString("test")); strArrayWriter.complete(); - ArraySerializer arraySerializer = new ArraySerializer(DataTypes.STRING()); + ArraySerializer arraySerializer = new ArraySerializer(DataTypes.STRING(), COMPACTED); CompactedRowWriter writer = new CompactedRowWriter(types.length); writer.writeArray(strArray, arraySerializer); diff --git a/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowWriterTest.java b/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowWriterTest.java index 0dc9ea7bc..d84460ba4 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowWriterTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowWriterTest.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test; import java.util.Random; +import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.COMPACTED; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link CompactedRowWriter}. */ @@ -97,7 +98,7 @@ class CompactedRowWriterTest { new CompactedRowReader.FieldReader[allDataTypes.length]; for (int i = 0; i < allDataTypes.length; i++) { getters[i] = InternalRow.createFieldGetter(allDataTypes[i], i); - writers[i] = BinaryWriter.createValueWriter(allDataTypes[i]); + writers[i] = BinaryWriter.createValueWriter(allDataTypes[i], COMPACTED); readers[i] = CompactedRowReader.createFieldReader(allDataTypes[i]); } for (int i = 0; i < 1000; i++) { diff --git a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java index 25cadcdab..bb36c1f77 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java @@ -35,6 +35,7 @@ import java.math.BigInteger; import java.time.LocalDate; import java.time.LocalTime; +import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.INDEXED; import static org.apache.fluss.row.BinaryString.fromString; import static org.apache.fluss.row.TestInternalRowGenerator.createAllTypes; import static org.apache.fluss.row.indexed.IndexedRowTest.assertAllTypeEquals; @@ -69,7 +70,7 @@ public class IndexedRowReaderTest { IndexedRowReader.FieldReader[] readers = new IndexedRowReader.FieldReader[dataTypes.length]; for (int i = 0; i < dataTypes.length; i++) { readers[i] = IndexedRowReader.createFieldReader(dataTypes[i]); - writers[i] = BinaryWriter.createValueWriter(dataTypes[i]); + writers[i] = BinaryWriter.createValueWriter(dataTypes[i], INDEXED); } IndexedRowWriter writer1 = new IndexedRowWriter(dataTypes); @@ -105,10 +106,11 @@ public class IndexedRowReaderTest { assertThat(reader.readTimestampNtz(5).toString()).isEqualTo("2023-10-25T12:01:13.182"); assertThat(reader.readTimestampLtz(1).toString()).isEqualTo("2023-10-25T12:01:13.182Z"); assertThat(reader.readTimestampLtz(5).toString()).isEqualTo("2023-10-25T12:01:13.182Z"); - assertThatArray(reader.readArray()) + assertThatArray(reader.readArray(dataTypes[19])) .withElementType(DataTypes.INT()) .isEqualTo(GenericArray.of(1, 2, 3, 4, 5, -11, null, 444, 102234)); - InternalRow nestedRow = reader.readRow(3); + InternalRow nestedRow = + reader.readRow(dataTypes[20].getChildren().toArray(new DataType[0])); GenericRow expectedInnerRow = GenericRow.of(20); GenericRow expectedNestedRow = GenericRow.of(123, expectedInnerRow, fromString("Test")); assertThatRow(nestedRow) diff --git a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java index b0f676ff7..9aea09e38 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java @@ -39,6 +39,7 @@ import java.math.BigInteger; import java.time.LocalDate; import java.time.LocalTime; +import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.INDEXED; import static org.apache.fluss.row.BinaryString.fromString; import static org.apache.fluss.row.TestInternalRowGenerator.createAllTypes; import static org.apache.fluss.testutils.InternalArrayAssert.assertThatArray; @@ -121,15 +122,15 @@ public class IndexedRowTest { row.pointTo(writer.segment(), 0, writer.position()); InternalRow.FieldGetter[] fieldGetter = new InternalRow.FieldGetter[dataTypes.length]; - IndexedRowWriter.FieldWriter[] writers = new IndexedRowWriter.FieldWriter[dataTypes.length]; + BinaryWriter.ValueWriter[] writers = new BinaryWriter.ValueWriter[dataTypes.length]; for (int i = 0; i < dataTypes.length; i++) { fieldGetter[i] = InternalRow.createFieldGetter(dataTypes[i], i); - writers[i] = IndexedRowWriter.createFieldWriter(dataTypes[i]); + writers[i] = BinaryWriter.createValueWriter(dataTypes[i], INDEXED); } IndexedRowWriter writer1 = new IndexedRowWriter(dataTypes); for (int i = 0; i < dataTypes.length; i++) { - writers[i].writeField(writer1, i, fieldGetter[i].getFieldOrNull(row)); + writers[i].writeValue(writer1, i, fieldGetter[i].getFieldOrNull(row)); } IndexedRow row1 = new IndexedRow(dataTypes); @@ -178,7 +179,7 @@ public class IndexedRowTest { BinaryWriter.ValueWriter[] writers = new BinaryWriter.ValueWriter[dataTypes.length]; for (int i = 0; i < dataTypes.length; i++) { - writers[i] = BinaryWriter.createValueWriter(dataTypes[i]); + writers[i] = BinaryWriter.createValueWriter(dataTypes[i], INDEXED); } writers[0].writeValue(writer, 0, true);
