This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-2079
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit d24ab9c438039cf8c6ce7ed5b54b8d4c29897f25
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 22 23:32:15 2025 +0800

    use the same row format in nested row (like, row<array<row>>)
---
 .../java/org/apache/fluss/row/BinaryArray.java     |  50 +++++----
 .../main/java/org/apache/fluss/row/BinaryRow.java  |  19 ++++
 .../org/apache/fluss/row/BinarySegmentUtils.java   |  58 ++++------
 .../java/org/apache/fluss/row/BinaryWriter.java    |  22 +++-
 .../org/apache/fluss/row/aligned/AlignedRow.java   |   6 +-
 .../org/apache/fluss/row/array/AlignedArray.java   |  42 +++++++
 .../org/apache/fluss/row/array/CompactedArray.java |  70 ++++++++++++
 .../org/apache/fluss/row/array/IndexedArray.java   |  69 ++++++++++++
 .../fluss/row/array/PrimitiveBinaryArray.java      |  43 ++++++++
 .../row/arrow/vectors/ArrowRowColumnVector.java    |  35 +-----
 .../fluss/row/compacted/CompactedKeyWriter.java    |   4 +-
 .../fluss/row/compacted/CompactedRowReader.java    |  23 ++--
 .../apache/fluss/row/encode/AlignedRowEncoder.java |  69 ++++++++++++
 .../fluss/row/encode/CompactedRowEncoder.java      |   4 +-
 .../apache/fluss/row/encode/IndexedRowEncoder.java |   4 +-
 .../org/apache/fluss/row/indexed/IndexedRow.java   |  37 +++++--
 .../apache/fluss/row/indexed/IndexedRowReader.java |  21 ++--
 .../apache/fluss/row/indexed/IndexedRowWriter.java | 105 ------------------
 .../fluss/row/serializer/ArraySerializer.java      |  25 ++++-
 .../apache/fluss/row/serializer/RowSerializer.java |  79 +++++++++-----
 .../java/org/apache/fluss/utils/ArrowUtils.java    |  13 ++-
 .../java/org/apache/fluss/row/BinaryArrayTest.java | 121 +++++++--------------
 .../apache/fluss/row/BinaryArrayWriterTest.java    |  45 ++++----
 .../org/apache/fluss/row/BinaryWriterTest.java     | 104 +++++++++---------
 .../org/apache/fluss/row/InternalArrayTest.java    |   3 +-
 .../apache/fluss/row/TestInternalRowGenerator.java |   3 +-
 .../apache/fluss/row/aligned/AlignedRowTest.java   |   3 +-
 .../compacted/CompactedRowDeserializerTest.java    |   8 +-
 .../row/compacted/CompactedRowWriterTest.java      |   3 +-
 .../fluss/row/indexed/IndexedRowReaderTest.java    |   8 +-
 .../apache/fluss/row/indexed/IndexedRowTest.java   |   9 +-
 31 files changed, 675 insertions(+), 430 deletions(-)

diff --git a/fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java 
b/fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java
index f2f3d4663..0aef1b0b6 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java
@@ -19,6 +19,7 @@ package org.apache.fluss.row;
 
 import org.apache.fluss.annotation.PublicEvolving;
 import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.array.PrimitiveBinaryArray;
 import org.apache.fluss.types.DataType;
 
 import java.lang.reflect.Array;
@@ -48,7 +49,7 @@ import static 
org.apache.fluss.utils.Preconditions.checkArgument;
  * @since 0.9
  */
 @PublicEvolving
-public final class BinaryArray extends BinarySection
+public abstract class BinaryArray extends BinarySection
         implements InternalArray, MemoryAwareGetters, DataSetters {
 
     private static final long serialVersionUID = 1L;
@@ -111,9 +112,7 @@ public final class BinaryArray extends BinarySection
     /** The position to start storing array elements. */
     private transient int elementOffset;
 
-    public BinaryArray() {}
-
-    private void assertIndexIsValid(int ordinal) {
+    protected void assertIndexIsValid(int ordinal) {
         assert ordinal >= 0 : "ordinal (" + ordinal + ") should >= 0";
         assert ordinal < size : "ordinal (" + ordinal + ") should < " + size;
     }
@@ -265,16 +264,14 @@ public final class BinaryArray extends BinarySection
     @Override
     public InternalArray getArray(int pos) {
         assertIndexIsValid(pos);
-        return BinarySegmentUtils.readBinaryArray(segments, offset, 
getLong(pos));
+        return BinarySegmentUtils.readBinaryArray(
+                segments, offset, getLong(pos), createNestedArrayInstance());
     }
 
-    // TODO: getMap() will be added in Issue #1973
+    /** Creates a nested {@link BinaryArray} with the nested data type 
information. */
+    protected abstract BinaryArray createNestedArrayInstance();
 
-    @Override
-    public InternalRow getRow(int pos, int numFields) {
-        assertIndexIsValid(pos);
-        return BinarySegmentUtils.readBinaryRow(segments, offset, numFields, 
getLong(pos));
-    }
+    // TODO: getMap() will be added in Issue #1973
 
     @Override
     public boolean getBoolean(int pos) {
@@ -550,21 +547,26 @@ public final class BinaryArray extends BinarySection
         return values;
     }
 
-    public BinaryArray copy() {
-        return copy(new BinaryArray());
-    }
-
-    public BinaryArray copy(BinaryArray reuse) {
-        byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, 
sizeInBytes);
-        reuse.pointTo(MemorySegment.wrap(bytes), 0, sizeInBytes);
-        return reuse;
-    }
-
     @Override
     public int hashCode() {
         return BinarySegmentUtils.hash(segments, offset, sizeInBytes);
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        // override equals and only checks the other object is instance of 
BinaryArray
+        if (!(o instanceof BinaryArray)) {
+            return false;
+        }
+        final BinarySection that = (BinarySection) o;
+        return sizeInBytes == that.sizeInBytes
+                && BinarySegmentUtils.equals(
+                        segments, offset, that.segments, that.offset, 
sizeInBytes);
+    }
+
     // 
------------------------------------------------------------------------------------------
     // Construction Utilities
     // 
------------------------------------------------------------------------------------------
@@ -616,13 +618,13 @@ public final class BinaryArray extends BinarySection
         UNSAFE.copyMemory(
                 arr, offset, data, BYTE_ARRAY_BASE_OFFSET + headerInBytes, 
valueRegionInBytes);
 
-        BinaryArray result = new BinaryArray();
+        BinaryArray result = new PrimitiveBinaryArray();
         result.pointTo(MemorySegment.wrap(data), 0, (int) totalSize);
         return result;
     }
 
     public static BinaryArray fromLongArray(Long[] arr) {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, arr.length, 8);
         for (int i = 0; i < arr.length; i++) {
             Long v = arr[i];
@@ -641,7 +643,7 @@ public final class BinaryArray extends BinarySection
             return (BinaryArray) arr;
         }
 
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, arr.size(), 8);
         for (int i = 0; i < arr.size(); i++) {
             if (arr.isNullAt(i)) {
diff --git a/fluss-common/src/main/java/org/apache/fluss/row/BinaryRow.java 
b/fluss-common/src/main/java/org/apache/fluss/row/BinaryRow.java
index a7afaa8a3..2235a34a7 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/BinaryRow.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/BinaryRow.java
@@ -18,6 +18,9 @@
 package org.apache.fluss.row;
 
 import org.apache.fluss.memory.MemorySegment;
+import org.apache.fluss.row.aligned.AlignedRow;
+import org.apache.fluss.row.compacted.CompactedRow;
+import org.apache.fluss.row.indexed.IndexedRow;
 
 /**
  * A binary format {@link InternalRow} that is backed on {@link MemorySegment} 
and supports all
@@ -57,4 +60,20 @@ public interface BinaryRow extends InternalRow, 
MemoryAwareGetters {
      * @param sizeInBytes The size of the row.
      */
     void pointTo(MemorySegment[] segments, int offset, int sizeInBytes);
+
+    /**
+     * The binary row format types, it indicates the generated {@link 
BinaryRow} type by the {@link
+     * BinaryWriter}.
+     */
+    enum BinaryRowFormat {
+
+        /** Compacted binary row format, see {@link CompactedRow}. */
+        COMPACTED,
+
+        /** Aligned binary row format, see {@link AlignedRow}. */
+        ALIGNED,
+
+        /** Indexed binary row format, see {@link IndexedRow}. */
+        INDEXED
+    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java
index 41b86ff18..bd532634a 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java
@@ -20,6 +20,7 @@ package org.apache.fluss.row;
 import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.memory.OutputView;
+import org.apache.fluss.row.aligned.AlignedRow;
 import org.apache.fluss.row.compacted.CompactedRow;
 import org.apache.fluss.row.indexed.IndexedRow;
 import org.apache.fluss.types.DataType;
@@ -989,14 +990,16 @@ public final class BinarySegmentUtils {
         return Decimal.fromUnscaledBytes(bytes, precision, scale);
     }
 
-    /** Gets an instance of {@link InternalArray} from underlying {@link 
MemorySegment}. */
+    /**
+     * Read the array data into the reused {@link BinaryArray} instance from 
underlying {@link
+     * MemorySegment}.
+     */
     public static BinaryArray readBinaryArray(
-            MemorySegment[] segments, int baseOffset, long offsetAndSize) {
+            MemorySegment[] segments, int baseOffset, long offsetAndSize, 
BinaryArray reusedArray) {
         final int size = ((int) offsetAndSize);
         int offset = (int) (offsetAndSize >> 32);
-        BinaryArray array = new BinaryArray();
-        array.pointTo(segments, offset + baseOffset, size);
-        return array;
+        reusedArray.pointTo(segments, offset + baseOffset, size);
+        return reusedArray;
     }
 
     /** Read map data from segments. */
@@ -1013,48 +1016,33 @@ public final class BinarySegmentUtils {
                 "Map type is not supported yet. Will be added in Issue 
#1973.");
     }
 
-    /** Gets an instance of {@link BinaryRow} from underlying {@link 
MemorySegment}. */
-    public static BinaryRow readBinaryRow(
-            MemorySegment[] segments, int baseOffset, int numFields, long 
offsetAndSize) {
+    /** Read aligned row from segments. */
+    public static InternalRow readAlignedRow(
+            MemorySegment[] segments, int baseOffset, long offsetAndSize, int 
numFields) {
         final int size = ((int) offsetAndSize);
         int offset = (int) (offsetAndSize >> 32);
-        org.apache.fluss.row.aligned.AlignedRow row =
-                new org.apache.fluss.row.aligned.AlignedRow(numFields);
-        row.pointTo(segments, offset + baseOffset, size);
+        AlignedRow row = new AlignedRow(numFields);
+        row.pointTo(segments, baseOffset + offset, size);
         return row;
     }
 
-    /** Read indexed row data from segments. */
+    /** Read indexed row from segments. */
     public static InternalRow readIndexedRow(
-            MemorySegment[] segments,
-            int offset,
-            int numBytes,
-            int numFields,
-            DataType[] fieldTypes) {
-        return readIndexedRowData(segments, offset, numBytes, numFields, 
fieldTypes);
-    }
-
-    /** Read IndexedRow data from segments. */
-    public static IndexedRow readIndexedRowData(
-            MemorySegment[] segments,
-            int offset,
-            int numBytes,
-            int numFields,
-            DataType[] fieldTypes) {
+            MemorySegment[] segments, int baseOffset, long offsetAndSize, 
DataType[] fieldTypes) {
+        final int size = ((int) offsetAndSize);
+        int offset = (int) (offsetAndSize >> 32);
         IndexedRow row = new IndexedRow(fieldTypes);
-        row.pointTo(segments, offset, numBytes);
+        row.pointTo(segments, baseOffset + offset, size);
         return row;
     }
 
-    /** Read compacted row data from segments. */
+    /** Read compacted row from segments. */
     public static InternalRow readCompactedRow(
-            MemorySegment[] segments,
-            int offset,
-            int numBytes,
-            int numFields,
-            DataType[] fieldTypes) {
+            MemorySegment[] segments, int baseOffset, long offsetAndSize, 
DataType[] fieldTypes) {
+        final int size = ((int) offsetAndSize);
+        int offset = (int) (offsetAndSize >> 32);
         CompactedRow row = new CompactedRow(fieldTypes);
-        row.pointTo(segments, offset, numBytes);
+        row.pointTo(segments, baseOffset + offset, size);
         return row;
     }
 
diff --git a/fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java 
b/fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java
index f59d57c98..5c542f92f 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java
@@ -18,12 +18,15 @@
 package org.apache.fluss.row;
 
 import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.row.BinaryRow.BinaryRowFormat;
 import org.apache.fluss.row.serializer.ArraySerializer;
 import org.apache.fluss.row.serializer.RowSerializer;
 import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.RowType;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 
 import static org.apache.fluss.types.DataTypeChecks.getLength;
@@ -87,9 +90,12 @@ public interface BinaryWriter {
      * Creates an accessor for setting the elements of a binary writer during 
runtime.
      *
      * @param elementType the element type
+     * @param rowFormat the binary row format, it is required when the element 
type has nested row
+     *     type, otherwise, {@link IllegalArgumentException} will be thrown.
      */
-    static BinaryWriter.ValueWriter createValueWriter(DataType elementType) {
-        BinaryWriter.ValueWriter valueWriter = 
createNotNullValueWriter(elementType);
+    static BinaryWriter.ValueWriter createValueWriter(
+            DataType elementType, BinaryRowFormat rowFormat) {
+        BinaryWriter.ValueWriter valueWriter = 
createNotNullValueWriter(elementType, rowFormat);
         if (!elementType.isNullable()) {
             return valueWriter;
         }
@@ -108,7 +114,8 @@ public interface BinaryWriter {
      *
      * @param elementType the element type
      */
-    static BinaryWriter.ValueWriter createNotNullValueWriter(DataType 
elementType) {
+    private static BinaryWriter.ValueWriter createNotNullValueWriter(
+            DataType elementType, @Nullable BinaryRowFormat rowFormat) {
         switch (elementType.getTypeRoot()) {
             case CHAR:
                 int charLength = getLength(elementType);
@@ -152,7 +159,7 @@ public interface BinaryWriter {
                         writer.writeTimestampLtz(pos, (TimestampLtz) value, 
timestampLtzPrecision);
             case ARRAY:
                 final ArraySerializer arraySerializer =
-                        new ArraySerializer(((ArrayType) 
elementType).getElementType());
+                        new ArraySerializer(((ArrayType) 
elementType).getElementType(), rowFormat);
                 return (writer, pos, value) ->
                         writer.writeArray(pos, (InternalArray) value, 
arraySerializer);
 
@@ -161,9 +168,14 @@ public interface BinaryWriter {
                 throw new UnsupportedOperationException(
                         "Map type is not supported yet. Will be added in Issue 
#1973.");
             case ROW:
+                if (rowFormat == null) {
+                    throw new IllegalArgumentException(
+                            "Binary row format is required to write row.");
+                }
                 final RowType rowType = (RowType) elementType;
                 final RowSerializer rowSerializer =
-                        new RowSerializer(rowType.getFieldTypes().toArray(new 
DataType[0]));
+                        new RowSerializer(
+                                rowType.getFieldTypes().toArray(new 
DataType[0]), rowFormat);
                 return (writer, pos, value) ->
                         writer.writeRow(pos, (InternalRow) value, 
rowSerializer);
             default:
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java 
b/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java
index 314ed4714..dad2b5822 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java
@@ -30,6 +30,7 @@ import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.NullAwareGetters;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.row.array.AlignedArray;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.DecimalType;
 import org.apache.fluss.types.LocalZonedTimestampType;
@@ -390,7 +391,8 @@ public final class AlignedRow extends BinarySection
         assertIndexIsValid(pos);
         int fieldOffset = getFieldOffset(pos);
         final long offsetAndSize = segments[0].getLong(fieldOffset);
-        return BinarySegmentUtils.readBinaryArray(segments, offset, 
offsetAndSize);
+        return BinarySegmentUtils.readBinaryArray(
+                segments, offset, offsetAndSize, new AlignedArray());
     }
 
     // TODO: getMap() will be added in Issue #1973
@@ -400,7 +402,7 @@ public final class AlignedRow extends BinarySection
         assertIndexIsValid(pos);
         int fieldOffset = getFieldOffset(pos);
         final long offsetAndSize = segments[0].getLong(fieldOffset);
-        return BinarySegmentUtils.readBinaryRow(segments, offset, numFields, 
offsetAndSize);
+        return BinarySegmentUtils.readAlignedRow(segments, offset, 
offsetAndSize, numFields);
     }
 
     /** The bit is 1 when the field is null. Default is 0. */
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/array/AlignedArray.java 
b/fluss-common/src/main/java/org/apache/fluss/row/array/AlignedArray.java
new file mode 100644
index 000000000..9919f46b3
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/row/array/AlignedArray.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.row.array;
+
+import org.apache.fluss.row.BinaryArray;
+import org.apache.fluss.row.BinarySegmentUtils;
+import org.apache.fluss.row.InternalRow;
+
+/**
+ * A {@link BinaryArray} that uses {@link 
org.apache.fluss.row.aligned.AlignedRow} as the binary
+ * format for arrays of nested row type.
+ */
+public class AlignedArray extends BinaryArray {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public InternalRow getRow(int pos, int numFields) {
+        return BinarySegmentUtils.readAlignedRow(segments, offset, 
getLong(pos), numFields);
+    }
+
+    @Override
+    protected BinaryArray createNestedArrayInstance() {
+        return new AlignedArray();
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/array/CompactedArray.java 
b/fluss-common/src/main/java/org/apache/fluss/row/array/CompactedArray.java
new file mode 100644
index 000000000..566d74cc0
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/row/array/CompactedArray.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.row.array;
+
+import org.apache.fluss.row.BinaryArray;
+import org.apache.fluss.row.BinarySegmentUtils;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.RowType;
+
+/**
+ * A {@link BinaryArray} that uses {@link 
org.apache.fluss.row.compacted.CompactedRow} as the binary
+ * format for arrays of nested row type.
+ */
+public class CompactedArray extends BinaryArray {
+
+    private final DataType elementType;
+
+    private transient DataType[] nestedFields;
+
+    public CompactedArray(DataType elementType) {
+        this.elementType = elementType;
+    }
+
+    @Override
+    public InternalRow getRow(int pos, int numFields) {
+        assertIndexIsValid(pos);
+        if (elementType instanceof RowType) {
+            if (nestedFields == null) {
+                nestedFields = ((RowType) 
elementType).getFieldTypes().toArray(new DataType[0]);
+            }
+            if (nestedFields.length != numFields) {
+                throw new IllegalArgumentException(
+                        "Unexpected number of fields " + numFields + " for " + 
elementType);
+            }
+            return BinarySegmentUtils.readCompactedRow(
+                    segments, offset, getLong(pos), nestedFields);
+        } else {
+            throw new IllegalArgumentException("Can not get row from Array of 
type " + elementType);
+        }
+    }
+
+    @Override
+    protected BinaryArray createNestedArrayInstance() {
+        if (elementType instanceof ArrayType) {
+            return new CompactedArray(((ArrayType) 
elementType).getElementType());
+        } else {
+            throw new IllegalArgumentException(
+                    "Can not get nested array from Array of type " + 
elementType);
+        }
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/array/IndexedArray.java 
b/fluss-common/src/main/java/org/apache/fluss/row/array/IndexedArray.java
new file mode 100644
index 000000000..860da18fb
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/row/array/IndexedArray.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.row.array;
+
+import org.apache.fluss.row.BinaryArray;
+import org.apache.fluss.row.BinarySegmentUtils;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.RowType;
+
+/**
+ * A {@link BinaryArray} that uses {@link 
org.apache.fluss.row.indexed.IndexedRow} as the binary
+ * format for arrays of nested row type.
+ */
+public class IndexedArray extends BinaryArray {
+
+    private final DataType elementType;
+
+    private transient DataType[] nestedFields;
+
+    public IndexedArray(DataType elementType) {
+        this.elementType = elementType;
+    }
+
+    @Override
+    public InternalRow getRow(int pos, int numFields) {
+        assertIndexIsValid(pos);
+        if (elementType instanceof RowType) {
+            if (nestedFields == null) {
+                nestedFields = ((RowType) 
elementType).getFieldTypes().toArray(new DataType[0]);
+            }
+            if (nestedFields.length != numFields) {
+                throw new IllegalArgumentException(
+                        "Unexpected number of fields " + numFields + " for " + 
elementType);
+            }
+            return BinarySegmentUtils.readIndexedRow(segments, offset, 
getLong(pos), nestedFields);
+        } else {
+            throw new IllegalArgumentException("Can not get row from Array of 
type " + elementType);
+        }
+    }
+
+    @Override
+    protected BinaryArray createNestedArrayInstance() {
+        if (elementType instanceof ArrayType) {
+            return new IndexedArray(((ArrayType) 
elementType).getElementType());
+        } else {
+            throw new IllegalArgumentException(
+                    "Can not get nested array from Array of type " + 
elementType);
+        }
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/array/PrimitiveBinaryArray.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/array/PrimitiveBinaryArray.java
new file mode 100644
index 000000000..d91a0594d
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/array/PrimitiveBinaryArray.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.row.array;
+
+import org.apache.fluss.row.BinaryArray;
+import org.apache.fluss.row.InternalRow;
+
+/**
+ * A BinaryArray implementation for primitive types (except complex types) 
which does not support
+ * getRow operation.
+ */
+public class PrimitiveBinaryArray extends BinaryArray {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public InternalRow getRow(int pos, int numFields) {
+        throw new IllegalArgumentException("Can not get nested row from array 
of primitive type.");
+    }
+
+    @Override
+    protected BinaryArray createNestedArrayInstance() {
+        // this should never be called from a primitive array,
+        // however, we still return a placeholder
+        return new PrimitiveBinaryArray();
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowRowColumnVector.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowRowColumnVector.java
index de5b4155f..0c325a53f 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowRowColumnVector.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowRowColumnVector.java
@@ -19,53 +19,26 @@
 package org.apache.fluss.row.arrow.vectors;
 
 import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.columnar.ColumnVector;
 import org.apache.fluss.row.columnar.ColumnarRow;
 import org.apache.fluss.row.columnar.RowColumnVector;
 import org.apache.fluss.row.columnar.VectorizedColumnBatch;
-import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
 import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector;
-import org.apache.fluss.types.RowType;
-import org.apache.fluss.utils.ArrowUtils;
-
-import java.util.List;
 
 /** ArrowRowColumnVector is a wrapper class for Arrow RowVector. */
 public class ArrowRowColumnVector implements RowColumnVector {
-    private boolean inited = false;
-    private final FieldVector vector;
-    private final RowType rowType;
-    private VectorizedColumnBatch vectorizedColumnBatch;
+    private final StructVector vector;
+    private final VectorizedColumnBatch vectorizedColumnBatch;
 
-    public ArrowRowColumnVector(FieldVector vector, RowType rowType) {
+    public ArrowRowColumnVector(StructVector vector, VectorizedColumnBatch 
columnBatch) {
         this.vector = vector;
-        this.rowType = rowType;
-    }
-
-    private void init() {
-        if (!inited) {
-            List<FieldVector> children = ((StructVector) 
vector).getChildrenFromFields();
-            ColumnVector[] vectors = new ColumnVector[children.size()];
-            for (int i = 0; i < children.size(); i++) {
-                vectors[i] =
-                        ArrowUtils.createArrowColumnVector(children.get(i), 
rowType.getTypeAt(i));
-            }
-            this.vectorizedColumnBatch = new VectorizedColumnBatch(vectors);
-            inited = true;
-        }
+        this.vectorizedColumnBatch = columnBatch;
     }
 
     @Override
     public InternalRow getRow(int i) {
-        init();
         return new ColumnarRow(vectorizedColumnBatch, i);
     }
 
-    public VectorizedColumnBatch getBatch() {
-        init();
-        return vectorizedColumnBatch;
-    }
-
     @Override
     public boolean isNullAt(int i) {
         return vector.isNull(i);
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedKeyWriter.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedKeyWriter.java
index ba3e088c4..d960785e0 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedKeyWriter.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedKeyWriter.java
@@ -20,6 +20,8 @@ package org.apache.fluss.row.compacted;
 import org.apache.fluss.row.BinaryWriter;
 import org.apache.fluss.types.DataType;
 
+import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.COMPACTED;
+
 /**
  * A wrapping of {@link CompactedRowWriter} used to encode key columns.
  *
@@ -35,7 +37,7 @@ public class CompactedKeyWriter extends CompactedRowWriter {
     }
 
     public static ValueWriter createValueWriter(DataType fieldType) {
-        ValueWriter valueWriter = BinaryWriter.createValueWriter(fieldType);
+        ValueWriter valueWriter = BinaryWriter.createValueWriter(fieldType, 
COMPACTED);
         return (writer, pos, value) -> {
             if (value == null) {
                 throw new IllegalArgumentException(
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java
index dc3e774ad..ad5d1b9e0 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java
@@ -25,6 +25,8 @@ import org.apache.fluss.row.InternalArray;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.row.array.CompactedArray;
+import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.RowType;
 
@@ -320,12 +322,14 @@ public class CompactedRowReader {
                 fieldReader = (reader, pos) -> 
reader.readTimestampLtz(timestampLtzPrecision);
                 break;
             case ARRAY:
-                fieldReader = (reader, pos) -> reader.readArray();
+                DataType elementType = ((ArrayType) 
fieldType).getElementType();
+                fieldReader = (reader, pos) -> reader.readArray(elementType);
                 break;
 
             case ROW:
-                final int rowFieldCount = ((RowType) 
fieldType).getFieldCount();
-                fieldReader = (reader, pos) -> reader.readRow(rowFieldCount);
+                DataType[] nestedFieldTypes =
+                        ((RowType) fieldType).getFieldTypes().toArray(new 
DataType[0]);
+                fieldReader = (reader, pos) -> 
reader.readRow(nestedFieldTypes);
                 break;
             default:
                 throw new IllegalArgumentException("Unsupported type for 
IndexedRow: " + fieldType);
@@ -341,18 +345,19 @@ public class CompactedRowReader {
         };
     }
 
-    public InternalArray readArray() {
+    public InternalArray readArray(DataType elementType) {
         int length = readInt();
-        InternalArray array = BinarySegmentUtils.readBinaryArray(segments, 
position, length);
+        InternalArray array =
+                BinarySegmentUtils.readBinaryArray(
+                        segments, position, length, new 
CompactedArray(elementType));
         position += length;
         return array;
     }
 
-    public InternalRow readRow(int numFields) {
+    public InternalRow readRow(DataType[] nestedFieldTypes) {
         int length = readInt();
-        InternalRow row =
-                BinarySegmentUtils.readBinaryRow(
-                        segments, 0, numFields, ((long) position << 32) | 
length);
+        CompactedRow row = new CompactedRow(nestedFieldTypes);
+        row.pointTo(segments, position, length);
         position += length;
         return row;
     }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/encode/AlignedRowEncoder.java 
b/fluss-common/src/main/java/org/apache/fluss/row/encode/AlignedRowEncoder.java
new file mode 100644
index 000000000..0bee6f4c3
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/encode/AlignedRowEncoder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.row.encode;
+
+import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.row.BinaryWriter;
+import org.apache.fluss.row.aligned.AlignedRow;
+import org.apache.fluss.row.aligned.AlignedRowWriter;
+import org.apache.fluss.types.DataType;
+
+import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.ALIGNED;
+
+/**
+ * A {@link RowEncoder} for {@link AlignedRow}.
+ *
+ * @since 0.9
+ */
+public class AlignedRowEncoder implements RowEncoder {
+    private final AlignedRow reuseRow;
+    private final AlignedRowWriter reuseWriter;
+    private final BinaryWriter.ValueWriter[] valueWriters;
+
+    public AlignedRowEncoder(DataType[] fieldTypes) {
+        this.reuseRow = new AlignedRow(fieldTypes.length);
+        this.reuseWriter = new AlignedRowWriter(reuseRow);
+        this.valueWriters = new BinaryWriter.ValueWriter[fieldTypes.length];
+        for (int i = 0; i < fieldTypes.length; i++) {
+            valueWriters[i] = BinaryWriter.createValueWriter(fieldTypes[i], 
ALIGNED);
+        }
+    }
+
+    @Override
+    public void startNewRow() {
+        reuseWriter.reset();
+    }
+
+    @Override
+    public void encodeField(int pos, Object value) {
+        valueWriters[pos].writeValue(reuseWriter, pos, value);
+    }
+
+    @Override
+    public BinaryRow finishRow() {
+        reuseWriter.complete();
+        return reuseRow;
+    }
+
+    @Override
+    public void close() throws Exception {
+        // nothing to close
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/encode/CompactedRowEncoder.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/encode/CompactedRowEncoder.java
index 76c04f777..546ce45a2 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/encode/CompactedRowEncoder.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/encode/CompactedRowEncoder.java
@@ -24,6 +24,8 @@ import 
org.apache.fluss.row.compacted.CompactedRowDeserializer;
 import org.apache.fluss.row.compacted.CompactedRowWriter;
 import org.apache.fluss.types.DataType;
 
+import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.COMPACTED;
+
 /**
  * A {@link RowEncoder} for {@link CompactedRow}.
  *
@@ -43,7 +45,7 @@ public class CompactedRowEncoder implements RowEncoder {
         writer = new CompactedRowWriter(fieldDataTypes.length);
         fieldWriters = new BinaryWriter.ValueWriter[fieldDataTypes.length];
         for (int i = 0; i < fieldDataTypes.length; i++) {
-            fieldWriters[i] = 
BinaryWriter.createValueWriter(fieldDataTypes[i]);
+            fieldWriters[i] = 
BinaryWriter.createValueWriter(fieldDataTypes[i], COMPACTED);
         }
         this.compactedRowDeserializer = new 
CompactedRowDeserializer(fieldDataTypes);
     }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/encode/IndexedRowEncoder.java 
b/fluss-common/src/main/java/org/apache/fluss/row/encode/IndexedRowEncoder.java
index 0bb552e5a..53fc9fac1 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/encode/IndexedRowEncoder.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/encode/IndexedRowEncoder.java
@@ -24,6 +24,8 @@ import org.apache.fluss.row.indexed.IndexedRowWriter;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.RowType;
 
+import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.INDEXED;
+
 /**
  * A {@link RowEncoder} for {@link IndexedRow}.
  *
@@ -46,7 +48,7 @@ public class IndexedRowEncoder implements RowEncoder {
         this.fieldWriters = new 
BinaryWriter.ValueWriter[fieldDataTypes.length];
         this.rowWriter = new IndexedRowWriter(fieldDataTypes);
         for (int i = 0; i < fieldDataTypes.length; i++) {
-            fieldWriters[i] = 
BinaryWriter.createValueWriter(fieldDataTypes[i]);
+            fieldWriters[i] = 
BinaryWriter.createValueWriter(fieldDataTypes[i], INDEXED);
         }
     }
 
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRow.java 
b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRow.java
index cf4fd4cb0..a42fe1639 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRow.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRow.java
@@ -30,12 +30,14 @@ import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.NullAwareGetters;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.row.array.IndexedArray;
+import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.BinaryType;
 import org.apache.fluss.types.CharType;
 import org.apache.fluss.types.DataType;
-import org.apache.fluss.types.DataTypeRoot;
 import org.apache.fluss.types.DecimalType;
 import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.RowType;
 import org.apache.fluss.types.StringType;
 import org.apache.fluss.utils.MurmurHashUtils;
 
@@ -237,7 +239,7 @@ public class IndexedRow implements BinaryRow, 
NullAwareGetters {
         BinaryWriter.ValueWriter[] writers = new 
BinaryWriter.ValueWriter[newType.length];
         for (int i = 0; i < newType.length; i++) {
             fieldGetter[i] = InternalRow.createFieldGetter(newType[i], 
fields[i]);
-            writers[i] = BinaryWriter.createValueWriter(newType[i]);
+            writers[i] = BinaryWriter.createValueWriter(newType[i], 
BinaryRowFormat.INDEXED);
         }
 
         IndexedRow projectRow = new IndexedRow(newType);
@@ -384,7 +386,15 @@ public class IndexedRow implements BinaryRow, 
NullAwareGetters {
         int offset = getFieldOffset(pos);
         int length = columnLengths[pos];
         long offsetAndLength = ((long) offset << 32) | length;
-        return BinarySegmentUtils.readBinaryArray(segments, 0, 
offsetAndLength);
+        DataType fieldType = fieldTypes[pos];
+        if (fieldType instanceof ArrayType) {
+            DataType elementType = ((ArrayType) fieldType).getElementType();
+            return BinarySegmentUtils.readBinaryArray(
+                    segments, 0, offsetAndLength, new 
IndexedArray(elementType));
+        } else {
+            throw new IllegalStateException(
+                    "Field type at position " + pos + " is not ArrayType: " + 
fieldType);
+        }
     }
 
     // TODO: getMap() will be added in Issue #1973
@@ -392,16 +402,19 @@ public class IndexedRow implements BinaryRow, 
NullAwareGetters {
     @Override
     public InternalRow getRow(int pos, int numFields) {
         assertIndexIsValid(pos);
-        int index = getFieldOffset(pos);
+        int offset = getFieldOffset(pos);
         int length = columnLengths[pos];
-        long offsetAndLength = ((long) index << 32) | length;
-        return BinarySegmentUtils.readBinaryRow(segments, 0, numFields, 
offsetAndLength);
-    }
-
-    private DataType[] getDataTypes(int pos) {
-        return (fieldTypes[pos].getTypeRoot() == DataTypeRoot.ROW)
-                ? fieldTypes[pos].getChildren().toArray(new DataType[0])
-                : fieldTypes;
+        long offsetAndLength = ((long) offset << 32) | length;
+        DataType fieldType = fieldTypes[pos];
+        if (fieldType instanceof RowType) {
+            DataType[] nestedFieldTypes =
+                    ((RowType) fieldType).getFieldTypes().toArray(new 
DataType[0]);
+            return BinarySegmentUtils.readIndexedRow(
+                    segments, 0, offsetAndLength, nestedFieldTypes);
+        } else {
+            throw new IllegalStateException(
+                    "Field type at position " + pos + " is not RowType: " + 
fieldType);
+        }
     }
 
     private void assertIndexIsValid(int index) {
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java 
b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java
index 6bc8f9ac3..43f70cbfb 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java
@@ -26,6 +26,8 @@ import org.apache.fluss.row.InternalArray;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.row.array.IndexedArray;
+import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.RowType;
 
@@ -202,18 +204,21 @@ public class IndexedRowReader {
         return Arrays.copyOfRange(bytes, 0, newLen);
     }
 
-    public InternalArray readArray() {
+    public InternalArray readArray(DataType elementType) {
         int length = readVarLengthFromVarLengthList();
         MemorySegment[] segments = new MemorySegment[] {segment};
-        InternalArray array = BinarySegmentUtils.readBinaryArray(segments, 
position, length);
+        InternalArray array =
+                BinarySegmentUtils.readBinaryArray(
+                        segments, position, length, new 
IndexedArray(elementType));
         position += length;
         return array;
     }
 
-    public InternalRow readRow(int numFields) {
+    public InternalRow readRow(DataType[] nestedFieldTypes) {
         int length = readVarLengthFromVarLengthList();
         MemorySegment[] segments = new MemorySegment[] {segment};
-        InternalRow row = BinarySegmentUtils.readBinaryRow(segments, position, 
numFields, length);
+        InternalRow row =
+                BinarySegmentUtils.readIndexedRow(segments, position, length, 
nestedFieldTypes);
         position += length;
         return row;
     }
@@ -278,11 +283,13 @@ public class IndexedRowReader {
                 fieldReader = (reader, pos) -> 
reader.readTimestampLtz(timestampLtzPrecision);
                 break;
             case ARRAY:
-                fieldReader = (reader, pos) -> reader.readArray();
+                DataType elementType = ((ArrayType) 
fieldType).getElementType();
+                fieldReader = (reader, pos) -> reader.readArray(elementType);
                 break;
             case ROW:
-                final int rowFieldCount = ((RowType) 
fieldType).getFieldCount();
-                fieldReader = (reader, pos) -> reader.readRow(rowFieldCount);
+                DataType[] nestedFieldTypes =
+                        ((RowType) fieldType).getFieldTypes().toArray(new 
DataType[0]);
+                fieldReader = (reader, pos) -> 
reader.readRow(nestedFieldTypes);
                 break;
             case MAP:
                 // TODO: Map type support will be added in Issue #1973
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java 
b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java
index ec88bb142..12fb2f090 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java
@@ -33,15 +33,12 @@ import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.row.serializer.ArraySerializer;
 import org.apache.fluss.row.serializer.RowSerializer;
-import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.DataType;
-import org.apache.fluss.types.DataTypeChecks;
 import org.apache.fluss.types.RowType;
 import org.apache.fluss.utils.UnsafeUtils;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.Serializable;
 import java.util.Arrays;
 
 /** Writer for {@link IndexedRow}. */
@@ -326,103 +323,6 @@ public class IndexedRowWriter extends OutputStream
 
     // 
------------------------------------------------------------------------------------------
 
-    /**
-     * Creates an accessor for writing the elements of an indexed row writer 
during runtime.
-     *
-     * @param fieldType the field type of the indexed row
-     */
-    public static FieldWriter createFieldWriter(DataType fieldType) {
-        final FieldWriter fieldWriter;
-        switch (fieldType.getTypeRoot()) {
-            case CHAR:
-                final int charLength = DataTypeChecks.getLength(fieldType);
-                fieldWriter =
-                        (writer, pos, value) -> 
writer.writeChar((BinaryString) value, charLength);
-                break;
-            case STRING:
-                fieldWriter = (writer, pos, value) -> 
writer.writeString((BinaryString) value);
-                break;
-            case BOOLEAN:
-                fieldWriter = (writer, pos, value) -> 
writer.writeBoolean((boolean) value);
-                break;
-            case BINARY:
-                final int binaryLength = DataTypeChecks.getLength(fieldType);
-                fieldWriter =
-                        (writer, pos, value) -> writer.writeBinary((byte[]) 
value, binaryLength);
-                break;
-            case BYTES:
-                fieldWriter = (writer, pos, value) -> 
writer.writeBytes((byte[]) value);
-                break;
-            case DECIMAL:
-                final int decimalPrecision = 
DataTypeChecks.getPrecision(fieldType);
-                fieldWriter =
-                        (writer, pos, value) ->
-                                writer.writeDecimal((Decimal) value, 
decimalPrecision);
-                break;
-            case TINYINT:
-                fieldWriter = (writer, pos, value) -> writer.writeByte((byte) 
value);
-                break;
-            case SMALLINT:
-                fieldWriter = (writer, pos, value) -> 
writer.writeShort((short) value);
-                break;
-            case INTEGER:
-            case DATE:
-            case TIME_WITHOUT_TIME_ZONE:
-                fieldWriter = (writer, pos, value) -> writer.writeInt((int) 
value);
-                break;
-            case BIGINT:
-                fieldWriter = (writer, pos, value) -> writer.writeLong((long) 
value);
-                break;
-            case FLOAT:
-                fieldWriter = (writer, pos, value) -> 
writer.writeFloat((float) value);
-                break;
-            case DOUBLE:
-                fieldWriter = (writer, pos, value) -> 
writer.writeDouble((double) value);
-                break;
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-                final int timestampNtzPrecision = 
DataTypeChecks.getPrecision(fieldType);
-                fieldWriter =
-                        (writer, pos, value) ->
-                                writer.writeTimestampNtz(
-                                        (TimestampNtz) value, 
timestampNtzPrecision);
-                break;
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                final int timestampLtzPrecision = 
DataTypeChecks.getPrecision(fieldType);
-                fieldWriter =
-                        (writer, pos, value) ->
-                                writer.writeTimestampLtz(
-                                        (TimestampLtz) value, 
timestampLtzPrecision);
-                break;
-            case ARRAY:
-                final ArraySerializer arraySerializer =
-                        new ArraySerializer(((ArrayType) 
fieldType).getElementType());
-                fieldWriter =
-                        (writer, pos, value) ->
-                                writer.writeArray((InternalArray) value, 
arraySerializer);
-                break;
-            case ROW:
-                final DataType[] rowFieldTypes =
-                        ((RowType) fieldType).getFieldTypes().toArray(new 
DataType[0]);
-                final RowSerializer rowSerializer = new 
RowSerializer(rowFieldTypes);
-                fieldWriter =
-                        (writer, pos, value) -> writer.writeRow((InternalRow) 
value, rowSerializer);
-                break;
-            default:
-                throw new IllegalArgumentException("Unsupported type for 
IndexedRow: " + fieldType);
-        }
-
-        if (!fieldType.isNullable()) {
-            return fieldWriter;
-        }
-        return (writer, pos, value) -> {
-            if (value == null) {
-                writer.setNullAt(pos);
-            } else {
-                fieldWriter.writeField(writer, pos, value);
-            }
-        };
-    }
-
     public static void serializeIndexedRow(IndexedRow row, OutputView target) 
throws IOException {
         int sizeInBytes = row.getSizeInBytes();
         if (target instanceof MemorySegmentWritable) {
@@ -433,9 +333,4 @@ public class IndexedRowWriter extends OutputStream
             target.write(bytes, 0, sizeInBytes);
         }
     }
-
-    /** Accessor for writing the elements of an indexed row writer during 
runtime. */
-    public interface FieldWriter extends Serializable {
-        void writeField(IndexedRowWriter writer, int pos, Object value);
-    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/serializer/ArraySerializer.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/serializer/ArraySerializer.java
index ecfdcab0d..1b24d6e5c 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/serializer/ArraySerializer.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/serializer/ArraySerializer.java
@@ -19,9 +19,13 @@ package org.apache.fluss.row.serializer;
 
 import org.apache.fluss.row.BinaryArray;
 import org.apache.fluss.row.BinaryArrayWriter;
+import org.apache.fluss.row.BinaryRow.BinaryRowFormat;
 import org.apache.fluss.row.BinaryWriter;
 import org.apache.fluss.row.GenericArray;
 import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.array.AlignedArray;
+import org.apache.fluss.row.array.CompactedArray;
+import org.apache.fluss.row.array.IndexedArray;
 import org.apache.fluss.types.DataType;
 
 import java.io.Serializable;
@@ -31,11 +35,13 @@ public class ArraySerializer implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final DataType eleType;
+    private final BinaryRowFormat rowFormat;
 
     private transient BinaryArraySerializer alignedSerializer;
 
-    public ArraySerializer(DataType eleType) {
+    public ArraySerializer(DataType eleType, BinaryRowFormat rowFormat) {
         this.eleType = eleType;
+        this.rowFormat = rowFormat;
     }
 
     public BinaryArray toBinaryArray(InternalArray from) {
@@ -45,6 +51,19 @@ public class ArraySerializer implements Serializable {
         return alignedSerializer.toAlignedArray(from);
     }
 
+    private BinaryArray createBinaryArrayInstance() {
+        switch (rowFormat) {
+            case COMPACTED:
+                return new CompactedArray(eleType);
+            case INDEXED:
+                return new IndexedArray(eleType);
+            case ALIGNED:
+                return new AlignedArray();
+            default:
+                throw new IllegalArgumentException("Unsupported row format: " 
+ rowFormat);
+        }
+    }
+
     // 
------------------------------------------------------------------------------------------
 
     /** Serializer function for AlignedArray. */
@@ -86,7 +105,7 @@ public class ArraySerializer implements Serializable {
 
             int numElements = from.size();
             if (reuseArray == null) {
-                reuseArray = new BinaryArray();
+                reuseArray = createBinaryArrayInstance();
             }
             if (reuseWriter == null || reuseWriter.getNumElements() != 
numElements) {
                 reuseWriter =
@@ -101,7 +120,7 @@ public class ArraySerializer implements Serializable {
                 elementGetter = InternalArray.createElementGetter(eleType);
             }
             if (valueWriter == null) {
-                valueWriter = BinaryWriter.createValueWriter(eleType);
+                valueWriter = BinaryWriter.createValueWriter(eleType, 
rowFormat);
             }
 
             for (int i = 0; i < numElements; i++) {
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/serializer/RowSerializer.java 
b/fluss-common/src/main/java/org/apache/fluss/row/serializer/RowSerializer.java
index 34d9f0185..5e2d02e60 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/serializer/RowSerializer.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/serializer/RowSerializer.java
@@ -18,10 +18,12 @@
 package org.apache.fluss.row.serializer;
 
 import org.apache.fluss.row.BinaryRow;
-import org.apache.fluss.row.BinaryWriter;
+import org.apache.fluss.row.BinaryRow.BinaryRowFormat;
 import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.aligned.AlignedRow;
-import org.apache.fluss.row.aligned.AlignedRowWriter;
+import org.apache.fluss.row.encode.AlignedRowEncoder;
+import org.apache.fluss.row.encode.CompactedRowEncoder;
+import org.apache.fluss.row.encode.IndexedRowEncoder;
+import org.apache.fluss.row.encode.RowEncoder;
 import org.apache.fluss.types.DataType;
 
 import java.io.Serializable;
@@ -31,44 +33,69 @@ public class RowSerializer implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final DataType[] fieldTypes;
+    private final BinaryRowFormat format;
 
-    private transient AlignedRow reuseRow;
-    private transient AlignedRowWriter reuseWriter;
-    private transient BinaryWriter.ValueWriter[] valueWriters;
+    private transient BinaryRowSerializer serializer;
 
-    public RowSerializer(DataType[] fieldTypes) {
+    public RowSerializer(DataType[] fieldTypes, BinaryRowFormat format) {
         this.fieldTypes = fieldTypes;
+        this.format = format;
     }
 
+    /**
+     * Serialize the given {@link InternalRow} into {@link BinaryRow}.
+     *
+     * <p>The returned {@link BinaryRow} might reuse the memory from the input 
{@link InternalRow}
+     * if it is already a {@link BinaryRow}.
+     *
+     * <p>Otherwise, it will serialize a {@link BinaryRow} based on the 
specified {@link
+     * BinaryRowFormat}.
+     */
     public BinaryRow toBinaryRow(InternalRow from) {
         if (from instanceof BinaryRow) {
             return (BinaryRow) from;
         }
 
-        int numFields = from.getFieldCount();
-        if (reuseRow == null || reuseRow.getFieldCount() != numFields) {
-            reuseRow = new AlignedRow(numFields);
-            reuseWriter = new AlignedRowWriter(reuseRow);
-        } else {
-            reuseWriter.reset();
+        if (serializer == null) {
+            serializer = new BinaryRowSerializer(fieldTypes, format);
         }
-        if (valueWriters == null || valueWriters.length != numFields) {
-            valueWriters = new BinaryWriter.ValueWriter[numFields];
-            for (int i = 0; i < numFields; i++) {
-                valueWriters[i] = 
BinaryWriter.createValueWriter(fieldTypes[i]);
+        return serializer.toBinaryRow(from);
+    }
+
+    /**
+     * Serializer function for BinaryRow, it delegates the actual encoding to 
different {@link
+     * RowEncoder} based on the specified format.
+     */
+    private static class BinaryRowSerializer {
+        private final RowEncoder rowEncoder;
+        private final InternalRow.FieldGetter[] fieldGetters;
+
+        private BinaryRowSerializer(DataType[] fieldTypes, BinaryRowFormat 
format) {
+            this.fieldGetters = new InternalRow.FieldGetter[fieldTypes.length];
+            for (int i = 0; i < fieldTypes.length; i++) {
+                this.fieldGetters[i] = 
InternalRow.createFieldGetter(fieldTypes[i], i);
+            }
+            switch (format) {
+                case COMPACTED:
+                    this.rowEncoder = new CompactedRowEncoder(fieldTypes);
+                    break;
+                case INDEXED:
+                    this.rowEncoder = new IndexedRowEncoder(fieldTypes);
+                    break;
+                case ALIGNED:
+                    this.rowEncoder = new AlignedRowEncoder(fieldTypes);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported binary row 
format: " + format);
             }
         }
 
-        for (int i = 0; i < numFields; i++) {
-            if (from.isNullAt(i)) {
-                reuseWriter.setNullAt(i);
-            } else {
-                Object field = InternalRow.createFieldGetter(fieldTypes[i], 
i).getFieldOrNull(from);
-                valueWriters[i].writeValue(reuseWriter, i, field);
+        public BinaryRow toBinaryRow(InternalRow from) {
+            rowEncoder.startNewRow();
+            for (int i = 0; i < fieldGetters.length; i++) {
+                rowEncoder.encodeField(i, 
fieldGetters[i].getFieldOrNull(from));
             }
+            return rowEncoder.finishRow();
         }
-        reuseWriter.complete();
-
-        return reuseRow;
     }
 }
diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java
index fb91c32d2..956ae045d 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java
@@ -58,6 +58,7 @@ import org.apache.fluss.row.arrow.writers.ArrowTinyIntWriter;
 import org.apache.fluss.row.arrow.writers.ArrowVarBinaryWriter;
 import org.apache.fluss.row.arrow.writers.ArrowVarCharWriter;
 import org.apache.fluss.row.columnar.ColumnVector;
+import org.apache.fluss.row.columnar.VectorizedColumnBatch;
 import org.apache.fluss.shaded.arrow.com.google.flatbuffers.FlatBufferBuilder;
 import org.apache.fluss.shaded.arrow.org.apache.arrow.flatbuf.MessageHeader;
 import org.apache.fluss.shaded.arrow.org.apache.arrow.flatbuf.RecordBatch;
@@ -355,7 +356,7 @@ public class ArrowUtils {
         }
     }
 
-    public static ColumnVector createArrowColumnVector(ValueVector vector, 
DataType dataType) {
+    private static ColumnVector createArrowColumnVector(ValueVector vector, 
DataType dataType) {
         if (vector instanceof TinyIntVector) {
             return new ArrowTinyIntColumnVector((TinyIntVector) vector);
         } else if (vector instanceof SmallIntVector) {
@@ -401,7 +402,15 @@ public class ArrowUtils {
 
         } else if (vector instanceof StructVector && dataType instanceof 
RowType) {
             RowType rowType = (RowType) dataType;
-            return new ArrowRowColumnVector((FieldVector) vector, rowType);
+            StructVector structVector = (StructVector) vector;
+            List<FieldVector> fieldVectors = 
structVector.getChildrenFromFields();
+            ColumnVector[] columnVectors = new 
ColumnVector[fieldVectors.size()];
+            for (int i = 0; i < fieldVectors.size(); i++) {
+                columnVectors[i] =
+                        ArrowUtils.createArrowColumnVector(
+                                fieldVectors.get(i), rowType.getTypeAt(i));
+            }
+            return new ArrowRowColumnVector(structVector, new 
VectorizedColumnBatch(columnVectors));
         } else {
             throw new UnsupportedOperationException(
                     String.format("Unsupported type %s.", dataType));
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java 
b/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java
index 5f445c92a..14230b62e 100644
--- a/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.row;
 
+import org.apache.fluss.row.array.PrimitiveBinaryArray;
 import org.apache.fluss.types.DataTypes;
 
 import org.junit.jupiter.api.Test;
@@ -103,7 +104,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testWriteAndReadInt() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4);
 
         writer.writeInt(0, 10);
@@ -119,7 +120,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testWriteAndReadString() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         writer.writeString(0, BinaryString.fromString("hello"));
@@ -133,7 +134,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetNull() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 8);
 
         writer.writeLong(0, 100L);
@@ -151,7 +152,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetAndGetDecimal() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         Decimal decimal1 = Decimal.fromUnscaledLong(123, 5, 2);
@@ -168,7 +169,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetAndGetTimestampNtz() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         TimestampNtz ts1 = TimestampNtz.fromMillis(1000L);
@@ -185,7 +186,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetAndGetTimestampLtz() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         TimestampLtz ts1 = TimestampLtz.fromEpochMillis(1000L);
@@ -202,7 +203,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetAndGetBinary() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         byte[] binary1 = {1, 2, 3};
@@ -217,32 +218,6 @@ public class BinaryArrayTest {
         assertThat(array.getBinary(1, 3)).isEqualTo(binary2);
     }
 
-    @Test
-    public void testCopy() {
-        int[] intArray = {1, 2, 3, 4, 5};
-        BinaryArray original = BinaryArray.fromPrimitiveArray(intArray);
-
-        BinaryArray copied = original.copy();
-
-        assertThat(copied.size()).isEqualTo(original.size());
-        assertThat(copied.getInt(0)).isEqualTo(1);
-        assertThat(copied.getInt(4)).isEqualTo(5);
-    }
-
-    @Test
-    public void testCopyWithReuse() {
-        int[] intArray = {1, 2, 3};
-        BinaryArray original = BinaryArray.fromPrimitiveArray(intArray);
-
-        BinaryArray reuse = new BinaryArray();
-        BinaryArray copied = original.copy(reuse);
-
-        assertThat(copied).isSameAs(reuse);
-        assertThat(copied.size()).isEqualTo(3);
-        assertThat(copied.getInt(0)).isEqualTo(1);
-        assertThat(copied.getInt(2)).isEqualTo(3);
-    }
-
     @Test
     public void testHashCode() {
         int[] intArray = {1, 2, 3};
@@ -285,7 +260,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testAnyNull() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4);
 
         writer.writeInt(0, 10);
@@ -304,7 +279,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testToArrayWithNullThrowsException() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4);
 
         writer.writeInt(0, 10);
@@ -327,7 +302,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testToObjectArrayWithNull() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4);
 
         writer.writeInt(0, 10);
@@ -341,7 +316,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testGetChar() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         writer.writeString(0, BinaryString.fromString("hello"));
@@ -354,7 +329,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetBoolean() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 1);
 
         writer.writeBoolean(0, true);
@@ -369,7 +344,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetByte() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 1);
 
         writer.writeByte(0, (byte) 1);
@@ -384,7 +359,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetShort() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 2);
 
         writer.writeShort(0, (short) 10);
@@ -399,7 +374,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetFloat() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4);
 
         writer.writeFloat(0, 1.5f);
@@ -414,7 +389,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetDouble() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 8);
 
         writer.writeDouble(0, 1.1);
@@ -454,7 +429,7 @@ public class BinaryArrayTest {
         int[] intArray = {1, 2, 3};
         BinaryArray array1 = BinaryArray.fromPrimitiveArray(intArray);
 
-        BinaryArray array2 = new BinaryArray();
+        BinaryArray array2 = new PrimitiveBinaryArray();
         array2.pointTo(array1.getSegments(), array1.getOffset(), 
array1.getSizeInBytes());
 
         assertThat(array2.size()).isEqualTo(3);
@@ -465,7 +440,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testHighPrecisionTimestamp() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         TimestampNtz ts1 = TimestampNtz.fromMillis(1000L, 123456);
@@ -481,7 +456,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testHighPrecisionTimestampLtz() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         TimestampLtz ts1 = TimestampLtz.fromEpochMillis(1000L, 123456);
@@ -497,7 +472,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testLargeDecimal() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         Decimal decimal1 = Decimal.fromBigDecimal(new 
java.math.BigDecimal("123.456"), 20, 3);
@@ -513,7 +488,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetNotNullAt() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 8);
 
         writer.setNullLong(0);
@@ -529,7 +504,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetNullAt() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 8);
 
         writer.writeLong(0, 100L);
@@ -567,7 +542,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetDecimalCompact() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         Decimal decimal1 = Decimal.fromUnscaledLong(123, 5, 2);
@@ -586,7 +561,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetTimestampNtzCompact() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         TimestampNtz ts1 = TimestampNtz.fromMillis(1000L);
@@ -605,7 +580,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetTimestampLtzCompact() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         TimestampLtz ts1 = TimestampLtz.fromEpochMillis(1000L);
@@ -624,7 +599,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testToObjectArrayWithNulls() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4);
 
         writer.writeInt(0, 100);
@@ -683,7 +658,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetNullLong() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
         writer.setNullLong(0);
         writer.writeLong(1, 100L);
@@ -695,7 +670,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetNullInt() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4);
         writer.setNullInt(0);
         writer.writeInt(1, 100);
@@ -707,7 +682,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetNullFloat() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4);
         writer.setNullFloat(0);
         writer.writeFloat(1, 3.14f);
@@ -719,7 +694,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetNullDouble() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
         writer.setNullDouble(0);
         writer.writeDouble(1, 3.14159);
@@ -731,7 +706,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetNullBoolean() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 1);
         writer.setNullBoolean(0);
         writer.writeBoolean(1, true);
@@ -743,7 +718,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetNullByte() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 1);
         writer.setNullByte(0);
         writer.writeByte(1, (byte) 42);
@@ -755,7 +730,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetNullShort() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 2);
         writer.setNullShort(0);
         writer.writeShort(1, (short) 123);
@@ -767,7 +742,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testGetString() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8);
         writer.writeString(0, BinaryString.fromString("test"));
         writer.complete();
@@ -777,7 +752,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testGetBytes() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8);
         byte[] bytes = {1, 2, 3, 4, 5};
         writer.writeBinary(0, bytes, 10);
@@ -807,18 +782,6 @@ public class BinaryArrayTest {
         assertThat(size).isGreaterThan(0);
     }
 
-    @Test
-    public void testCopyWithMultipleTypes() {
-        BinaryArray array = BinaryArray.fromPrimitiveArray(new int[] {1, 2, 3, 
4, 5});
-
-        BinaryArray copied = array.copy();
-
-        assertThat(copied).isNotSameAs(array);
-        assertThat(copied.size()).isEqualTo(5);
-        assertThat(copied.getInt(0)).isEqualTo(1);
-        assertThat(copied.getInt(4)).isEqualTo(5);
-    }
-
     @Test
     public void testHashCodeConsistency() {
         BinaryArray array1 = BinaryArray.fromPrimitiveArray(new int[] {1, 2, 
3});
@@ -829,7 +792,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetDecimalNonCompact() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         Decimal largeDecimal =
@@ -849,7 +812,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetTimestampNtzNonCompact() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         TimestampNtz ts1 = TimestampNtz.fromMillis(1000L, 123456);
@@ -868,7 +831,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testSetTimestampLtzNonCompact() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         TimestampLtz ts1 = TimestampLtz.fromEpochMillis(1000L, 123456);
@@ -919,7 +882,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testGetBinaryWithLength() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8);
         byte[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
         writer.writeBytes(0, data);
@@ -931,7 +894,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testGetDecimalNonCompact() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8);
 
         Decimal largeDecimal =
@@ -945,7 +908,7 @@ public class BinaryArrayTest {
 
     @Test
     public void testGetTimestampNonCompact() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         TimestampNtz tsNtz = TimestampNtz.fromMillis(1000L, 123456);
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayWriterTest.java 
b/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayWriterTest.java
index 466346f98..a77367a47 100644
--- a/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayWriterTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayWriterTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.row;
 
+import org.apache.fluss.row.array.PrimitiveBinaryArray;
 import org.apache.fluss.types.DataTypes;
 
 import org.junit.jupiter.api.Test;
@@ -28,7 +29,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testWriteAndReadAllTypes() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 10, 8);
 
         writer.writeBoolean(0, true);
@@ -57,7 +58,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testReset() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4);
 
         writer.writeInt(0, 10);
@@ -80,7 +81,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testSetNullMethods() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 8, 8);
 
         writer.setNullBoolean(0);
@@ -108,7 +109,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testSetOffsetAndSize() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         writer.setOffsetAndSize(0, 100, 200);
@@ -126,7 +127,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testGetFieldOffset() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 5, 8);
 
         int fieldOffset0 = writer.getFieldOffset(0);
@@ -139,7 +140,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testGetNumElements() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 5, 4);
 
         assertThat(writer.getNumElements()).isEqualTo(5);
@@ -147,7 +148,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testWriteChar() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         writer.writeChar(0, BinaryString.fromString("hello"), 5);
@@ -160,7 +161,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testWriteBinary() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         byte[] binary1 = {1, 2, 3, 4};
@@ -176,7 +177,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testWriteTimestampNtz() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         TimestampNtz ts1 = TimestampNtz.fromMillis(1000L, 123456);
@@ -192,7 +193,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testWriteTimestampLtz() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         TimestampLtz ts1 = TimestampLtz.fromEpochMillis(1000L, 123456);
@@ -208,7 +209,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testWriteNaNFloat() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4);
 
         writer.writeFloat(0, Float.NaN);
@@ -223,7 +224,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testWriteNaNDouble() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 8);
 
         writer.writeDouble(0, Double.NaN);
@@ -238,7 +239,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testAfterGrow() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
         writer.writeLong(0, 100L);
@@ -251,7 +252,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testCreateNullSetter() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 10, 8);
 
         BinaryArrayWriter.NullSetter booleanSetter =
@@ -314,7 +315,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testWriteLargeString() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8);
 
         String largeString = new String(new char[100]).replace('\0', 'a');
@@ -326,7 +327,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testWriteSmallString() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8);
 
         String smallString = "ab";
@@ -338,7 +339,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testWriteLargeBinary() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8);
 
         byte[] largeBinary = new byte[100];
@@ -354,7 +355,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testSetNullBit() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 5, 4);
 
         writer.setNullBit(0);
@@ -371,7 +372,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testMultipleResetCycles() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4);
 
         for (int cycle = 0; cycle < 5; cycle++) {
@@ -390,7 +391,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testWriteNullDecimal() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8);
 
         writer.writeDecimal(0, null, 20);
@@ -401,7 +402,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testWriteNullTimestampNtz() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8);
 
         writer.writeTimestampNtz(0, null, 9);
@@ -412,7 +413,7 @@ public class BinaryArrayWriterTest {
 
     @Test
     public void testWriteNullTimestampLtz() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 1, 8);
 
         writer.writeTimestampLtz(0, null, 9);
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/BinaryWriterTest.java 
b/fluss-common/src/test/java/org/apache/fluss/row/BinaryWriterTest.java
index d2501d6ca..d356eaaa8 100644
--- a/fluss-common/src/test/java/org/apache/fluss/row/BinaryWriterTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/row/BinaryWriterTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.fluss.row;
 
+import org.apache.fluss.row.array.PrimitiveBinaryArray;
+import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.DataTypes;
 
 import org.junit.jupiter.api.Test;
@@ -29,28 +31,24 @@ public class BinaryWriterTest {
 
     @Test
     public void testCreateValueSetterForAllTypes() {
-        BinaryWriter.ValueWriter booleanSetter =
-                BinaryWriter.createValueWriter(DataTypes.BOOLEAN());
-        BinaryWriter.ValueWriter tinyintSetter =
-                BinaryWriter.createValueWriter(DataTypes.TINYINT());
-        BinaryWriter.ValueWriter smallintSetter =
-                BinaryWriter.createValueWriter(DataTypes.SMALLINT());
-        BinaryWriter.ValueWriter intSetter = 
BinaryWriter.createValueWriter(DataTypes.INT());
-        BinaryWriter.ValueWriter bigintSetter = 
BinaryWriter.createValueWriter(DataTypes.BIGINT());
-        BinaryWriter.ValueWriter floatSetter = 
BinaryWriter.createValueWriter(DataTypes.FLOAT());
-        BinaryWriter.ValueWriter doubleSetter = 
BinaryWriter.createValueWriter(DataTypes.DOUBLE());
-        BinaryWriter.ValueWriter stringSetter = 
BinaryWriter.createValueWriter(DataTypes.STRING());
-        BinaryWriter.ValueWriter charSetter = 
BinaryWriter.createValueWriter(DataTypes.CHAR(10));
-        BinaryWriter.ValueWriter binarySetter =
-                BinaryWriter.createValueWriter(DataTypes.BINARY(10));
+        BinaryWriter.ValueWriter booleanSetter = 
createPrimitiveValueWriter(DataTypes.BOOLEAN());
+        BinaryWriter.ValueWriter tinyintSetter = 
createPrimitiveValueWriter(DataTypes.TINYINT());
+        BinaryWriter.ValueWriter smallintSetter = 
createPrimitiveValueWriter(DataTypes.SMALLINT());
+        BinaryWriter.ValueWriter intSetter = 
createPrimitiveValueWriter(DataTypes.INT());
+        BinaryWriter.ValueWriter bigintSetter = 
createPrimitiveValueWriter(DataTypes.BIGINT());
+        BinaryWriter.ValueWriter floatSetter = 
createPrimitiveValueWriter(DataTypes.FLOAT());
+        BinaryWriter.ValueWriter doubleSetter = 
createPrimitiveValueWriter(DataTypes.DOUBLE());
+        BinaryWriter.ValueWriter stringSetter = 
createPrimitiveValueWriter(DataTypes.STRING());
+        BinaryWriter.ValueWriter charSetter = 
createPrimitiveValueWriter(DataTypes.CHAR(10));
+        BinaryWriter.ValueWriter binarySetter = 
createPrimitiveValueWriter(DataTypes.BINARY(10));
         BinaryWriter.ValueWriter decimalSetter =
-                BinaryWriter.createValueWriter(DataTypes.DECIMAL(5, 2));
+                createPrimitiveValueWriter(DataTypes.DECIMAL(5, 2));
         BinaryWriter.ValueWriter timestampNtzSetter =
-                BinaryWriter.createValueWriter(DataTypes.TIMESTAMP(3));
+                createPrimitiveValueWriter(DataTypes.TIMESTAMP(3));
         BinaryWriter.ValueWriter timestampLtzSetter =
-                BinaryWriter.createValueWriter(DataTypes.TIMESTAMP_LTZ(3));
-        BinaryWriter.ValueWriter dateSetter = 
BinaryWriter.createValueWriter(DataTypes.DATE());
-        BinaryWriter.ValueWriter timeSetter = 
BinaryWriter.createValueWriter(DataTypes.TIME());
+                createPrimitiveValueWriter(DataTypes.TIMESTAMP_LTZ(3));
+        BinaryWriter.ValueWriter dateSetter = 
createPrimitiveValueWriter(DataTypes.DATE());
+        BinaryWriter.ValueWriter timeSetter = 
createPrimitiveValueWriter(DataTypes.TIME());
 
         assertThat(booleanSetter).isNotNull();
         assertThat(tinyintSetter).isNotNull();
@@ -71,10 +69,10 @@ public class BinaryWriterTest {
 
     @Test
     public void testValueSetterWithBooleanType() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 1);
 
-        BinaryWriter.ValueWriter setter = 
BinaryWriter.createValueWriter(DataTypes.BOOLEAN());
+        BinaryWriter.ValueWriter setter = 
createPrimitiveValueWriter(DataTypes.BOOLEAN());
         setter.writeValue(writer, 0, true);
         setter.writeValue(writer, 1, false);
         writer.complete();
@@ -85,10 +83,10 @@ public class BinaryWriterTest {
 
     @Test
     public void testValueSetterWithIntType() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4);
 
-        BinaryWriter.ValueWriter setter = 
BinaryWriter.createValueWriter(DataTypes.INT());
+        BinaryWriter.ValueWriter setter = 
createPrimitiveValueWriter(DataTypes.INT());
         setter.writeValue(writer, 0, 100);
         setter.writeValue(writer, 1, 200);
         writer.complete();
@@ -99,10 +97,10 @@ public class BinaryWriterTest {
 
     @Test
     public void testValueSetterWithStringType() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
-        BinaryWriter.ValueWriter setter = 
BinaryWriter.createValueWriter(DataTypes.STRING());
+        BinaryWriter.ValueWriter setter = 
createPrimitiveValueWriter(DataTypes.STRING());
         setter.writeValue(writer, 0, BinaryString.fromString("hello"));
         setter.writeValue(writer, 1, BinaryString.fromString("world"));
         writer.complete();
@@ -113,10 +111,10 @@ public class BinaryWriterTest {
 
     @Test
     public void testValueSetterWithDecimalType() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
-        BinaryWriter.ValueWriter setter = 
BinaryWriter.createValueWriter(DataTypes.DECIMAL(5, 2));
+        BinaryWriter.ValueWriter setter = 
createPrimitiveValueWriter(DataTypes.DECIMAL(5, 2));
         setter.writeValue(writer, 0, Decimal.fromUnscaledLong(123, 5, 2));
         setter.writeValue(writer, 1, Decimal.fromUnscaledLong(456, 5, 2));
         writer.complete();
@@ -129,7 +127,7 @@ public class BinaryWriterTest {
     public void testCreateValueSetterForMapThrowsException() {
         assertThatThrownBy(
                         () ->
-                                BinaryWriter.createValueWriter(
+                                createPrimitiveValueWriter(
                                         DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING())))
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessageContaining("Map type is not supported yet");
@@ -137,10 +135,10 @@ public class BinaryWriterTest {
 
     @Test
     public void testValueSetterWithByteType() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 1);
 
-        BinaryWriter.ValueWriter setter = 
BinaryWriter.createValueWriter(DataTypes.TINYINT());
+        BinaryWriter.ValueWriter setter = 
createPrimitiveValueWriter(DataTypes.TINYINT());
         setter.writeValue(writer, 0, (byte) 10);
         setter.writeValue(writer, 1, (byte) 20);
         writer.complete();
@@ -151,10 +149,10 @@ public class BinaryWriterTest {
 
     @Test
     public void testValueSetterWithShortType() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 2);
 
-        BinaryWriter.ValueWriter setter = 
BinaryWriter.createValueWriter(DataTypes.SMALLINT());
+        BinaryWriter.ValueWriter setter = 
createPrimitiveValueWriter(DataTypes.SMALLINT());
         setter.writeValue(writer, 0, (short) 100);
         setter.writeValue(writer, 1, (short) 200);
         writer.complete();
@@ -165,10 +163,10 @@ public class BinaryWriterTest {
 
     @Test
     public void testValueSetterWithLongType() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
-        BinaryWriter.ValueWriter setter = 
BinaryWriter.createValueWriter(DataTypes.BIGINT());
+        BinaryWriter.ValueWriter setter = 
createPrimitiveValueWriter(DataTypes.BIGINT());
         setter.writeValue(writer, 0, 1000L);
         setter.writeValue(writer, 1, 2000L);
         writer.complete();
@@ -179,10 +177,10 @@ public class BinaryWriterTest {
 
     @Test
     public void testValueSetterWithFloatType() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4);
 
-        BinaryWriter.ValueWriter setter = 
BinaryWriter.createValueWriter(DataTypes.FLOAT());
+        BinaryWriter.ValueWriter setter = 
createPrimitiveValueWriter(DataTypes.FLOAT());
         setter.writeValue(writer, 0, 1.5f);
         setter.writeValue(writer, 1, 2.5f);
         writer.complete();
@@ -193,10 +191,10 @@ public class BinaryWriterTest {
 
     @Test
     public void testValueSetterWithDoubleType() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
-        BinaryWriter.ValueWriter setter = 
BinaryWriter.createValueWriter(DataTypes.DOUBLE());
+        BinaryWriter.ValueWriter setter = 
createPrimitiveValueWriter(DataTypes.DOUBLE());
         setter.writeValue(writer, 0, 1.1);
         setter.writeValue(writer, 1, 2.2);
         writer.complete();
@@ -207,10 +205,10 @@ public class BinaryWriterTest {
 
     @Test
     public void testValueSetterWithCharType() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
-        BinaryWriter.ValueWriter setter = 
BinaryWriter.createValueWriter(DataTypes.CHAR(5));
+        BinaryWriter.ValueWriter setter = 
createPrimitiveValueWriter(DataTypes.CHAR(5));
         setter.writeValue(writer, 0, BinaryString.fromString("hello"));
         setter.writeValue(writer, 1, BinaryString.fromString("world"));
         writer.complete();
@@ -221,10 +219,10 @@ public class BinaryWriterTest {
 
     @Test
     public void testValueSetterWithBinaryType() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
-        BinaryWriter.ValueWriter setter = 
BinaryWriter.createValueWriter(DataTypes.BINARY(3));
+        BinaryWriter.ValueWriter setter = 
createPrimitiveValueWriter(DataTypes.BINARY(3));
         setter.writeValue(writer, 0, new byte[] {1, 2, 3});
         setter.writeValue(writer, 1, new byte[] {4, 5, 6});
         writer.complete();
@@ -235,10 +233,10 @@ public class BinaryWriterTest {
 
     @Test
     public void testValueSetterWithTimestampNtzType() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
-        BinaryWriter.ValueWriter setter = 
BinaryWriter.createValueWriter(DataTypes.TIMESTAMP(3));
+        BinaryWriter.ValueWriter setter = 
createPrimitiveValueWriter(DataTypes.TIMESTAMP(3));
         setter.writeValue(writer, 0, TimestampNtz.fromMillis(1000L));
         setter.writeValue(writer, 1, TimestampNtz.fromMillis(2000L));
         writer.complete();
@@ -249,11 +247,10 @@ public class BinaryWriterTest {
 
     @Test
     public void testValueSetterWithTimestampLtzType() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 
-        BinaryWriter.ValueWriter setter =
-                BinaryWriter.createValueWriter(DataTypes.TIMESTAMP_LTZ(3));
+        BinaryWriter.ValueWriter setter = 
createPrimitiveValueWriter(DataTypes.TIMESTAMP_LTZ(3));
         setter.writeValue(writer, 0, TimestampLtz.fromEpochMillis(1000L));
         setter.writeValue(writer, 1, TimestampLtz.fromEpochMillis(2000L));
         writer.complete();
@@ -264,10 +261,10 @@ public class BinaryWriterTest {
 
     @Test
     public void testValueSetterWithDateType() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4);
 
-        BinaryWriter.ValueWriter setter = 
BinaryWriter.createValueWriter(DataTypes.DATE());
+        BinaryWriter.ValueWriter setter = 
createPrimitiveValueWriter(DataTypes.DATE());
         setter.writeValue(writer, 0, 18000);
         setter.writeValue(writer, 1, 18001);
         writer.complete();
@@ -278,10 +275,10 @@ public class BinaryWriterTest {
 
     @Test
     public void testValueSetterWithTimeType() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4);
 
-        BinaryWriter.ValueWriter setter = 
BinaryWriter.createValueWriter(DataTypes.TIME());
+        BinaryWriter.ValueWriter setter = 
createPrimitiveValueWriter(DataTypes.TIME());
         setter.writeValue(writer, 0, 3600000);
         setter.writeValue(writer, 1, 7200000);
         writer.complete();
@@ -289,4 +286,9 @@ public class BinaryWriterTest {
         assertThat(array.getInt(0)).isEqualTo(3600000);
         assertThat(array.getInt(1)).isEqualTo(7200000);
     }
+
+    private static BinaryWriter.ValueWriter 
createPrimitiveValueWriter(DataType elementType) {
+        // use null for row format if there is no nested row type
+        return BinaryWriter.createValueWriter(elementType, null);
+    }
 }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/InternalArrayTest.java 
b/fluss-common/src/test/java/org/apache/fluss/row/InternalArrayTest.java
index b98ab932b..95e803b20 100644
--- a/fluss-common/src/test/java/org/apache/fluss/row/InternalArrayTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/row/InternalArrayTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.row;
 
+import org.apache.fluss.row.array.PrimitiveBinaryArray;
 import org.apache.fluss.types.DataTypes;
 
 import org.junit.jupiter.api.Test;
@@ -233,7 +234,7 @@ public class InternalArrayTest {
 
     @Test
     public void testBinaryArrayWithNullElementGetter() {
-        BinaryArray array = new BinaryArray();
+        BinaryArray array = new PrimitiveBinaryArray();
         BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4);
 
         writer.writeInt(0, 10);
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java 
b/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java
index f1c0a3bfe..d6d1b6a15 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java
@@ -32,6 +32,7 @@ import java.time.LocalDate;
 import java.time.LocalTime;
 import java.util.Random;
 
+import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.INDEXED;
 import static org.apache.fluss.row.BinaryString.fromString;
 
 /** Test all types and generate test internal row. */
@@ -78,7 +79,7 @@ public class TestInternalRowGenerator {
 
         BinaryWriter.ValueWriter[] writers = new 
BinaryWriter.ValueWriter[dataTypes.length];
         for (int i = 0; i < dataTypes.length; i++) {
-            writers[i] = BinaryWriter.createValueWriter(dataTypes[i]);
+            writers[i] = BinaryWriter.createValueWriter(dataTypes[i], INDEXED);
         }
 
         Random rnd = new Random();
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java 
b/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java
index 6836bcb60..519fa2fd8 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java
@@ -38,6 +38,7 @@ import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
 
+import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.ALIGNED;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link AlignedRow}. */
@@ -546,7 +547,7 @@ class AlignedRowTest {
         AlignedRowWriter writer = new AlignedRowWriter(row);
         BinaryWriter.ValueWriter[] fieldSetters = new 
BinaryWriter.ValueWriter[10];
         for (int i = 0; i < fieldTypes.length; i++) {
-            fieldSetters[i] = BinaryWriter.createValueWriter(fieldTypes[i]);
+            fieldSetters[i] = BinaryWriter.createValueWriter(fieldTypes[i], 
ALIGNED);
         }
 
         // Test static write method for different data types
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowDeserializerTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowDeserializerTest.java
index c280c8ac8..20bfb633d 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowDeserializerTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowDeserializerTest.java
@@ -25,6 +25,7 @@ import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.InternalArray;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.row.array.PrimitiveBinaryArray;
 import org.apache.fluss.row.serializer.ArraySerializer;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.DataTypes;
@@ -34,6 +35,7 @@ import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
 
+import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.COMPACTED;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link CompactedRowDeserializer}. */
@@ -923,7 +925,7 @@ public class CompactedRowDeserializerTest {
         CompactedRowDeserializer deserializer = new 
CompactedRowDeserializer(types);
 
         BinaryArray intArray = BinaryArray.fromPrimitiveArray(new int[] {1, 2, 
3, 4, 5});
-        ArraySerializer arraySerializer = new ArraySerializer(DataTypes.INT());
+        ArraySerializer arraySerializer = new ArraySerializer(DataTypes.INT(), 
COMPACTED);
 
         CompactedRowWriter writer = new CompactedRowWriter(types.length);
         writer.writeInt(100);
@@ -948,14 +950,14 @@ public class CompactedRowDeserializerTest {
         DataType[] types = {DataTypes.ARRAY(DataTypes.STRING())};
         CompactedRowDeserializer deserializer = new 
CompactedRowDeserializer(types);
 
-        BinaryArray strArray = new BinaryArray();
+        BinaryArray strArray = new PrimitiveBinaryArray();
         BinaryArrayWriter strArrayWriter = new BinaryArrayWriter(strArray, 3, 
8);
         strArrayWriter.writeString(0, BinaryString.fromString("hello"));
         strArrayWriter.writeString(1, BinaryString.fromString("world"));
         strArrayWriter.writeString(2, BinaryString.fromString("test"));
         strArrayWriter.complete();
 
-        ArraySerializer arraySerializer = new 
ArraySerializer(DataTypes.STRING());
+        ArraySerializer arraySerializer = new 
ArraySerializer(DataTypes.STRING(), COMPACTED);
 
         CompactedRowWriter writer = new CompactedRowWriter(types.length);
         writer.writeArray(strArray, arraySerializer);
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowWriterTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowWriterTest.java
index 0dc9ea7bc..d84460ba4 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowWriterTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowWriterTest.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Random;
 
+import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.COMPACTED;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link CompactedRowWriter}. */
@@ -97,7 +98,7 @@ class CompactedRowWriterTest {
                 new CompactedRowReader.FieldReader[allDataTypes.length];
         for (int i = 0; i < allDataTypes.length; i++) {
             getters[i] = InternalRow.createFieldGetter(allDataTypes[i], i);
-            writers[i] = BinaryWriter.createValueWriter(allDataTypes[i]);
+            writers[i] = BinaryWriter.createValueWriter(allDataTypes[i], 
COMPACTED);
             readers[i] = CompactedRowReader.createFieldReader(allDataTypes[i]);
         }
         for (int i = 0; i < 1000; i++) {
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java
index 25cadcdab..bb36c1f77 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java
@@ -35,6 +35,7 @@ import java.math.BigInteger;
 import java.time.LocalDate;
 import java.time.LocalTime;
 
+import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.INDEXED;
 import static org.apache.fluss.row.BinaryString.fromString;
 import static org.apache.fluss.row.TestInternalRowGenerator.createAllTypes;
 import static org.apache.fluss.row.indexed.IndexedRowTest.assertAllTypeEquals;
@@ -69,7 +70,7 @@ public class IndexedRowReaderTest {
         IndexedRowReader.FieldReader[] readers = new 
IndexedRowReader.FieldReader[dataTypes.length];
         for (int i = 0; i < dataTypes.length; i++) {
             readers[i] = IndexedRowReader.createFieldReader(dataTypes[i]);
-            writers[i] = BinaryWriter.createValueWriter(dataTypes[i]);
+            writers[i] = BinaryWriter.createValueWriter(dataTypes[i], INDEXED);
         }
 
         IndexedRowWriter writer1 = new IndexedRowWriter(dataTypes);
@@ -105,10 +106,11 @@ public class IndexedRowReaderTest {
         
assertThat(reader.readTimestampNtz(5).toString()).isEqualTo("2023-10-25T12:01:13.182");
         
assertThat(reader.readTimestampLtz(1).toString()).isEqualTo("2023-10-25T12:01:13.182Z");
         
assertThat(reader.readTimestampLtz(5).toString()).isEqualTo("2023-10-25T12:01:13.182Z");
-        assertThatArray(reader.readArray())
+        assertThatArray(reader.readArray(dataTypes[19]))
                 .withElementType(DataTypes.INT())
                 .isEqualTo(GenericArray.of(1, 2, 3, 4, 5, -11, null, 444, 
102234));
-        InternalRow nestedRow = reader.readRow(3);
+        InternalRow nestedRow =
+                reader.readRow(dataTypes[20].getChildren().toArray(new 
DataType[0]));
         GenericRow expectedInnerRow = GenericRow.of(20);
         GenericRow expectedNestedRow = GenericRow.of(123, expectedInnerRow, 
fromString("Test"));
         assertThatRow(nestedRow)
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java 
b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java
index b0f676ff7..9aea09e38 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java
@@ -39,6 +39,7 @@ import java.math.BigInteger;
 import java.time.LocalDate;
 import java.time.LocalTime;
 
+import static org.apache.fluss.row.BinaryRow.BinaryRowFormat.INDEXED;
 import static org.apache.fluss.row.BinaryString.fromString;
 import static org.apache.fluss.row.TestInternalRowGenerator.createAllTypes;
 import static org.apache.fluss.testutils.InternalArrayAssert.assertThatArray;
@@ -121,15 +122,15 @@ public class IndexedRowTest {
         row.pointTo(writer.segment(), 0, writer.position());
 
         InternalRow.FieldGetter[] fieldGetter = new 
InternalRow.FieldGetter[dataTypes.length];
-        IndexedRowWriter.FieldWriter[] writers = new 
IndexedRowWriter.FieldWriter[dataTypes.length];
+        BinaryWriter.ValueWriter[] writers = new 
BinaryWriter.ValueWriter[dataTypes.length];
         for (int i = 0; i < dataTypes.length; i++) {
             fieldGetter[i] = InternalRow.createFieldGetter(dataTypes[i], i);
-            writers[i] = IndexedRowWriter.createFieldWriter(dataTypes[i]);
+            writers[i] = BinaryWriter.createValueWriter(dataTypes[i], INDEXED);
         }
 
         IndexedRowWriter writer1 = new IndexedRowWriter(dataTypes);
         for (int i = 0; i < dataTypes.length; i++) {
-            writers[i].writeField(writer1, i, 
fieldGetter[i].getFieldOrNull(row));
+            writers[i].writeValue(writer1, i, 
fieldGetter[i].getFieldOrNull(row));
         }
 
         IndexedRow row1 = new IndexedRow(dataTypes);
@@ -178,7 +179,7 @@ public class IndexedRowTest {
 
         BinaryWriter.ValueWriter[] writers = new 
BinaryWriter.ValueWriter[dataTypes.length];
         for (int i = 0; i < dataTypes.length; i++) {
-            writers[i] = BinaryWriter.createValueWriter(dataTypes[i]);
+            writers[i] = BinaryWriter.createValueWriter(dataTypes[i], INDEXED);
         }
 
         writers[0].writeValue(writer, 0, true);

Reply via email to