This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-2079
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 04053f3a09ea56bb42ebe67f16cbaa728b9e36bc
Author: forwardxu <[email protected]>
AuthorDate: Tue Dec 23 14:23:51 2025 +0800

    fix failed cases
---
 fluss-client/pom.xml                               |   8 -
 .../scanner/log/DefaultCompletedFetchTest.java     |  28 ++-
 .../java/org/apache/fluss/row/BinaryArrayTest.java |  32 +++
 .../apache/fluss/row/TestInternalRowGenerator.java |  23 +-
 .../apache/fluss/row/aligned/AlignedRowTest.java   |  76 ++++++
 .../fluss/row/compacted/CompactedRowTest.java      |  95 ++++++++
 .../fluss/row/encode/AlignedRowEncoderTest.java    | 252 ++++++++++++++++++++
 .../fluss/row/indexed/IndexedRowReaderTest.java    |  15 +-
 .../apache/fluss/row/indexed/IndexedRowTest.java   | 152 +++++++++++-
 .../org/apache/fluss/testutils/DataTestUtils.java  |  51 +++-
 fluss-filesystems/fluss-fs-obs/pom.xml             |   2 -
 fluss-filesystems/fluss-fs-oss/pom.xml             |   2 -
 fluss-flink/fluss-flink-1.18/pom.xml               |   8 -
 fluss-flink/fluss-flink-1.19/pom.xml               |   8 -
 fluss-flink/fluss-flink-1.20/pom.xml               |   8 -
 fluss-flink/fluss-flink-2.1/pom.xml                |   8 -
 .../fluss/flink/utils/FlinkArrayConverter.java     | 262 ---------------------
 .../fluss/flink/utils/FlinkRowConverter.java       | 213 -----------------
 .../flink/utils/FlussRowToFlinkRowConverter.java   |  42 +++-
 .../fluss/flink/sink/FlinkComplexTypeITCase.java   |  26 ++
 .../utils/FlinkRowToFlussRowConverterTest.java     |  25 +-
 .../utils/FlussRowToFlinkRowConverterTest.java     |  35 ++-
 .../iceberg/source/IcebergRecordAsFlussRow.java    |   3 +-
 fluss-lake/fluss-lake-lance/pom.xml                |   8 -
 fluss-server/pom.xml                               |   8 -
 pom.xml                                            |   3 +
 26 files changed, 827 insertions(+), 566 deletions(-)

diff --git a/fluss-client/pom.xml b/fluss-client/pom.xml
index 456febeec..481bf6ecd 100644
--- a/fluss-client/pom.xml
+++ b/fluss-client/pom.xml
@@ -113,14 +113,6 @@
                                     <include>*:*</include>
                                 </includes>
                             </artifactSet>
-                            <filters>
-                                <filter>
-                                    <artifact>*</artifact>
-                                    <excludes>
-                                        <exclude>LICENSE*</exclude>
-                                    </excludes>
-                                </filter>
-                            </filters>
                         </configuration>
                     </execution>
                 </executions>
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
index 72739ef26..e5b7eeff7 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
@@ -227,21 +227,32 @@ public class DefaultCompletedFetchTest {
                         new Object[] {
                             1,
                             new String[] {"a", "b"},
-                            new Object[] {new int[] {1, 2}, new int[] {3, 4}}
+                            new Object[] {new int[] {1, 2}, new int[] {3, 4}},
+                            new Object[] {10, new Object[] {20, "nested"}, 
"row1"}
                         },
                         new Object[] {
-                            2, new String[] {"c", null}, new Object[] {null, 
new int[] {3, 4}}
+                            2,
+                            new String[] {"c", null},
+                            new Object[] {null, new int[] {3, 4}},
+                            new Object[] {30, new Object[] {40, "test"}, 
"row2"}
                         },
                         new Object[] {
                             3,
                             new String[] {"e", "f"},
-                            new Object[] {new int[] {5, 6, 7}, new int[] {8}}
+                            new Object[] {new int[] {5, 6, 7}, new int[] {8}},
+                            new Object[] {50, new Object[] {60, "value"}, 
"row3"}
                         });
         Schema schema =
                 Schema.newBuilder()
                         .column("a", DataTypes.INT())
                         .column("b", DataTypes.ARRAY(DataTypes.STRING()))
                         .column("c", 
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
+                        .column(
+                                "d",
+                                DataTypes.ROW(
+                                        DataTypes.INT(),
+                                        DataTypes.ROW(DataTypes.INT(), 
DataTypes.STRING()),
+                                        DataTypes.STRING()))
                         .build();
         TableInfo tableInfo =
                 TableInfo.of(
@@ -299,6 +310,17 @@ public class DefaultCompletedFetchTest {
                     .isEqualTo(Arrays.deepToString((Object[]) 
complexData.get(i)[1]));
             assertThat(row.getArray(2).toString())
                     .isEqualTo(Arrays.deepToString((Object[]) 
complexData.get(i)[2]));
+            InternalRow nestedRow = row.getRow(3, 3);
+            assertThat(nestedRow).isNotNull();
+            assertThat(nestedRow.getInt(0)).isEqualTo(((Object[]) 
complexData.get(i)[3])[0]);
+            InternalRow deeplyNestedRow = nestedRow.getRow(1, 2);
+            assertThat(deeplyNestedRow).isNotNull();
+            assertThat(deeplyNestedRow.getInt(0))
+                    .isEqualTo(((Object[]) ((Object[]) 
complexData.get(i)[3])[1])[0]);
+            assertThat(deeplyNestedRow.getString(1).toString())
+                    .isEqualTo(((Object[]) ((Object[]) 
complexData.get(i)[3])[1])[1]);
+            assertThat(nestedRow.getString(2).toString())
+                    .isEqualTo(((Object[]) complexData.get(i)[3])[2]);
         }
     }
 
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java 
b/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java
index 14230b62e..d3054ead6 100644
--- a/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/row/BinaryArrayTest.java
@@ -929,4 +929,36 @@ public class BinaryArrayTest {
         
assertThat(BinaryArray.calculateFixLengthPartSize(DataTypes.CHAR(10))).isEqualTo(8);
         
assertThat(BinaryArray.calculateFixLengthPartSize(DataTypes.BINARY(20))).isEqualTo(8);
     }
+
+    @Test
+    public void testPrimitiveBinaryArrayGetRowThrowsException() {
+        PrimitiveBinaryArray array = new PrimitiveBinaryArray();
+        BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4);
+        writer.writeInt(0, 10);
+        writer.writeInt(1, 20);
+        writer.writeInt(2, 30);
+        writer.complete();
+
+        assertThatThrownBy(() -> array.getRow(0, 2))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Can not get nested row from array of 
primitive type");
+    }
+
+    @Test
+    public void testPrimitiveBinaryArrayGetArray() {
+        PrimitiveBinaryArray innerArray1 = new PrimitiveBinaryArray();
+        BinaryArrayWriter innerWriter1 = new BinaryArrayWriter(innerArray1, 2, 
4);
+        innerWriter1.writeInt(0, 10);
+        innerWriter1.writeInt(1, 20);
+        innerWriter1.complete();
+
+        PrimitiveBinaryArray outerArray = new PrimitiveBinaryArray();
+        BinaryArrayWriter outerWriter = new BinaryArrayWriter(outerArray, 1, 
8);
+        outerWriter.setOffsetAndSize(0, 0, innerArray1.getSizeInBytes());
+        outerWriter.complete();
+
+        InternalArray result = outerArray.getArray(0);
+        assertThat(result).isNotNull();
+        assertThat(result).isInstanceOf(PrimitiveBinaryArray.class);
+    }
 }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java 
b/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java
index d6d1b6a15..a9e08315d 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/TestInternalRowGenerator.java
@@ -63,9 +63,15 @@ public class TestInternalRowGenerator {
                 new DataField("f17", DataTypes.TIMESTAMP_LTZ(1)),
                 new DataField("f18", DataTypes.TIMESTAMP_LTZ(5)),
                 new DataField("f19", DataTypes.ARRAY(DataTypes.INT())),
-                // TODO: Add Map and Row fields in Issue #1973
                 new DataField(
                         "f20",
+                        DataTypes.ARRAY(DataTypes.FLOAT().copy(false))), // 
vector embedding type
+                new DataField(
+                        "f21",
+                        DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING()))), 
// nested array
+                // TODO: Add Map and Row fields in Issue #1973
+                new DataField(
+                        "f22",
                         DataTypes.ROW(
                                 new DataField("u1", DataTypes.INT()),
                                 new DataField(
@@ -123,6 +129,17 @@ public class TestInternalRowGenerator {
         GenericArray array1 = GenericArray.of(1, 2, 3, 4, 5, -11, null, 444, 
102234);
         setRandomNull(writers[19], writer, 19, rnd, array1);
 
+        GenericArray array2 =
+                GenericArray.of(0.1f, 1.1f, -0.5f, 6.6f, Float.MAX_VALUE, 
Float.MIN_VALUE);
+        setRandomNull(writers[20], writer, 20, rnd, array2);
+
+        GenericArray array3 =
+                GenericArray.of(
+                        GenericArray.of(fromString("a"), null, 
fromString("c")),
+                        null,
+                        GenericArray.of(fromString("hello"), 
fromString("world")));
+        setRandomNull(writers[21], writer, 21, rnd, array3);
+
         // TODO: Map type support will be added in Issue #1973
         // Map<Object, Object> javaMap = new HashMap<>();
         // javaMap.put(0, null);
@@ -131,9 +148,9 @@ public class TestInternalRowGenerator {
         // GenericMap map = new GenericMap(javaMap);
         // setRandomNull(writers[20], writer, 20, rnd, map);
 
-        GenericRow innerRow = GenericRow.of(20);
+        GenericRow innerRow = GenericRow.of(22);
         GenericRow genericRow = GenericRow.of(123, innerRow, 
BinaryString.fromString("Test"));
-        setRandomNull(writers[20], writer, 20, rnd, genericRow);
+        setRandomNull(writers[22], writer, 22, rnd, genericRow);
 
         IndexedRow row = new IndexedRow(dataTypes);
         row.pointTo(writer.segment(), 0, writer.position());
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java 
b/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java
index 519fa2fd8..c9a8be0ce 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/aligned/AlignedRowTest.java
@@ -21,6 +21,8 @@ import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.BinaryWriter;
 import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.types.DataType;
@@ -744,4 +746,78 @@ class AlignedRowTest {
         assertThat(row.getTimestampNtz(10, 
9).toString()).contains("2021-01-01T00:00:00.000123456");
         assertThat(row.isNullAt(11)).isTrue();
     }
+
+    @Test
+    public void testAlignedArrayGetRow() {
+        DataType rowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("f1", DataTypes.INT()),
+                        DataTypes.FIELD("f2", DataTypes.STRING()));
+        DataType arrayType = DataTypes.ARRAY(rowType);
+
+        AlignedRow outerRow = new AlignedRow(1);
+        AlignedRowWriter outerWriter = new AlignedRowWriter(outerRow);
+
+        org.apache.fluss.row.GenericRow row1 =
+                org.apache.fluss.row.GenericRow.of(100, 
BinaryString.fromString("nested"));
+        org.apache.fluss.row.GenericRow row2 =
+                org.apache.fluss.row.GenericRow.of(200, 
BinaryString.fromString("nested2"));
+        org.apache.fluss.row.GenericArray arrayData =
+                org.apache.fluss.row.GenericArray.of(row1, row2);
+
+        BinaryWriter.ValueWriter arrayWriter = 
BinaryWriter.createValueWriter(arrayType, ALIGNED);
+        arrayWriter.writeValue(outerWriter, 0, arrayData);
+        outerWriter.complete();
+
+        InternalArray array = outerRow.getArray(0);
+        assertThat(array).isNotNull();
+        assertThat(array.size()).isEqualTo(2);
+
+        InternalRow nestedRow1 = array.getRow(0, 2);
+        assertThat(nestedRow1.getInt(0)).isEqualTo(100);
+        
assertThat(nestedRow1.getString(1)).isEqualTo(BinaryString.fromString("nested"));
+
+        InternalRow nestedRow2 = array.getRow(1, 2);
+        assertThat(nestedRow2.getInt(0)).isEqualTo(200);
+        
assertThat(nestedRow2.getString(1)).isEqualTo(BinaryString.fromString("nested2"));
+    }
+
+    @Test
+    public void testAlignedArrayGetNestedArray() {
+        DataType innerArrayType = DataTypes.ARRAY(DataTypes.INT());
+        DataType outerArrayType = DataTypes.ARRAY(innerArrayType);
+
+        AlignedRow outerRow = new AlignedRow(1);
+        AlignedRowWriter outerWriter = new AlignedRowWriter(outerRow);
+
+        org.apache.fluss.row.GenericArray innerArray1 =
+                org.apache.fluss.row.GenericArray.of(10, 20, 30);
+        org.apache.fluss.row.GenericArray innerArray2 =
+                org.apache.fluss.row.GenericArray.of(40, 50, 60);
+        org.apache.fluss.row.GenericArray outerArrayData =
+                org.apache.fluss.row.GenericArray.of(innerArray1, innerArray2);
+
+        BinaryWriter.ValueWriter arrayWriter =
+                BinaryWriter.createValueWriter(outerArrayType, ALIGNED);
+        arrayWriter.writeValue(outerWriter, 0, outerArrayData);
+        outerWriter.complete();
+
+        InternalArray outerArray = outerRow.getArray(0);
+        assertThat(outerArray).isNotNull();
+        assertThat(outerArray.size()).isEqualTo(2);
+
+        InternalArray nestedArray1 = outerArray.getArray(0);
+        assertThat(nestedArray1).isNotNull();
+        assertThat(nestedArray1.size()).isEqualTo(3);
+        assertThat(nestedArray1.getInt(0)).isEqualTo(10);
+        assertThat(nestedArray1.getInt(1)).isEqualTo(20);
+        assertThat(nestedArray1.getInt(2)).isEqualTo(30);
+
+        InternalArray nestedArray2 = outerArray.getArray(1);
+        assertThat(nestedArray2).isNotNull();
+        assertThat(nestedArray2.size()).isEqualTo(3);
+        assertThat(nestedArray2.getInt(0)).isEqualTo(40);
+        assertThat(nestedArray2.getInt(1)).isEqualTo(50);
+        assertThat(nestedArray2.getInt(2)).isEqualTo(60);
+    }
 }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowTest.java
index 72382f67a..ea1bdd11f 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/compacted/CompactedRowTest.java
@@ -28,6 +28,7 @@ import org.apache.fluss.types.DataTypes;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link CompactedRow}. */
 public class CompactedRowTest {
@@ -412,4 +413,98 @@ public class CompactedRowTest {
             assertThat(row.getInt(i)).isEqualTo(i * 10);
         }
     }
+
+    @Test
+    public void testCompactedArrayGetRowWithInvalidType() {
+        DataType arrayType = DataTypes.ARRAY(DataTypes.INT());
+        DataType[] fieldTypes = {arrayType};
+
+        CompactedRow outerRow = new CompactedRow(fieldTypes);
+        CompactedRowWriter outerWriter = new 
CompactedRowWriter(fieldTypes.length);
+
+        org.apache.fluss.row.GenericArray arrayData = 
org.apache.fluss.row.GenericArray.of(1, 2, 3);
+        org.apache.fluss.row.serializer.ArraySerializer serializer =
+                new org.apache.fluss.row.serializer.ArraySerializer(
+                        DataTypes.INT(), 
org.apache.fluss.row.BinaryRow.BinaryRowFormat.COMPACTED);
+        outerWriter.writeArray(arrayData, serializer);
+
+        outerRow.pointTo(outerWriter.segment(), 0, outerWriter.position());
+
+        org.apache.fluss.row.InternalArray array = outerRow.getArray(0);
+        assertThatThrownBy(() -> array.getRow(0, 1))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Can not get row from Array of type");
+    }
+
+    @Test
+    public void testCompactedArrayGetRowWithWrongNumFields() {
+        DataType rowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("f1", DataTypes.INT()),
+                        DataTypes.FIELD("f2", DataTypes.STRING()));
+        DataType arrayType = DataTypes.ARRAY(rowType);
+        DataType[] fieldTypes = {arrayType};
+
+        CompactedRow outerRow = new CompactedRow(fieldTypes);
+        CompactedRowWriter outerWriter = new 
CompactedRowWriter(fieldTypes.length);
+
+        org.apache.fluss.row.GenericRow innerRow =
+                org.apache.fluss.row.GenericRow.of(100, 
BinaryString.fromString("test"));
+        org.apache.fluss.row.GenericArray arrayData =
+                org.apache.fluss.row.GenericArray.of(innerRow);
+
+        org.apache.fluss.row.serializer.ArraySerializer serializer =
+                new org.apache.fluss.row.serializer.ArraySerializer(
+                        rowType, 
org.apache.fluss.row.BinaryRow.BinaryRowFormat.COMPACTED);
+        outerWriter.writeArray(arrayData, serializer);
+
+        outerRow.pointTo(outerWriter.segment(), 0, outerWriter.position());
+
+        org.apache.fluss.row.InternalArray array = outerRow.getArray(0);
+        assertThatThrownBy(() -> array.getRow(0, 5))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Unexpected number of fields");
+    }
+
+    @Test
+    public void testCompactedArrayGetNestedArray() {
+        DataType innerArrayType = DataTypes.ARRAY(DataTypes.INT());
+        DataType outerArrayType = DataTypes.ARRAY(innerArrayType);
+        DataType[] fieldTypes = {outerArrayType};
+
+        CompactedRow outerRow = new CompactedRow(fieldTypes);
+        CompactedRowWriter outerWriter = new 
CompactedRowWriter(fieldTypes.length);
+
+        org.apache.fluss.row.GenericArray innerArray1 =
+                org.apache.fluss.row.GenericArray.of(10, 20, 30);
+        org.apache.fluss.row.GenericArray innerArray2 =
+                org.apache.fluss.row.GenericArray.of(40, 50, 60);
+        org.apache.fluss.row.GenericArray outerArrayData =
+                org.apache.fluss.row.GenericArray.of(innerArray1, innerArray2);
+
+        org.apache.fluss.row.serializer.ArraySerializer serializer =
+                new org.apache.fluss.row.serializer.ArraySerializer(
+                        innerArrayType, 
org.apache.fluss.row.BinaryRow.BinaryRowFormat.COMPACTED);
+        outerWriter.writeArray(outerArrayData, serializer);
+
+        outerRow.pointTo(outerWriter.segment(), 0, outerWriter.position());
+
+        org.apache.fluss.row.InternalArray outerArray = outerRow.getArray(0);
+        assertThat(outerArray).isNotNull();
+        assertThat(outerArray.size()).isEqualTo(2);
+
+        org.apache.fluss.row.InternalArray nestedArray1 = 
outerArray.getArray(0);
+        assertThat(nestedArray1).isNotNull();
+        assertThat(nestedArray1.size()).isEqualTo(3);
+        assertThat(nestedArray1.getInt(0)).isEqualTo(10);
+        assertThat(nestedArray1.getInt(1)).isEqualTo(20);
+        assertThat(nestedArray1.getInt(2)).isEqualTo(30);
+
+        org.apache.fluss.row.InternalArray nestedArray2 = 
outerArray.getArray(1);
+        assertThat(nestedArray2).isNotNull();
+        assertThat(nestedArray2.size()).isEqualTo(3);
+        assertThat(nestedArray2.getInt(0)).isEqualTo(40);
+        assertThat(nestedArray2.getInt(1)).isEqualTo(50);
+        assertThat(nestedArray2.getInt(2)).isEqualTo(60);
+    }
 }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/encode/AlignedRowEncoderTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/row/encode/AlignedRowEncoderTest.java
new file mode 100644
index 000000000..be0c31f99
--- /dev/null
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/encode/AlignedRowEncoderTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.row.encode;
+
+import org.apache.fluss.row.BinaryRow;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link AlignedRowEncoder}. */
+class AlignedRowEncoderTest {
+
+    @Test
+    void testEncodeSimpleTypes() throws Exception {
+        DataType[] fieldTypes =
+                new DataType[] {
+                    DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING(), 
DataTypes.BOOLEAN()
+                };
+        try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) {
+            encoder.startNewRow();
+            encoder.encodeField(0, 100);
+            encoder.encodeField(1, 200L);
+            encoder.encodeField(2, BinaryString.fromString("test"));
+            encoder.encodeField(3, true);
+            BinaryRow row = encoder.finishRow();
+
+            assertThat(row.getInt(0)).isEqualTo(100);
+            assertThat(row.getLong(1)).isEqualTo(200L);
+            
assertThat(row.getString(2)).isEqualTo(BinaryString.fromString("test"));
+            assertThat(row.getBoolean(3)).isTrue();
+        }
+    }
+
+    @Test
+    void testEncodeMultipleRows() throws Exception {
+        DataType[] fieldTypes = new DataType[] {DataTypes.INT(), 
DataTypes.STRING()};
+        try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) {
+            encoder.startNewRow();
+            encoder.encodeField(0, 1);
+            encoder.encodeField(1, BinaryString.fromString("first"));
+            BinaryRow row1 = encoder.finishRow();
+            BinaryRow row1Copy = row1.copy();
+
+            encoder.startNewRow();
+            encoder.encodeField(0, 2);
+            encoder.encodeField(1, BinaryString.fromString("second"));
+            BinaryRow row2 = encoder.finishRow();
+
+            assertThat(row1Copy.getInt(0)).isEqualTo(1);
+            
assertThat(row1Copy.getString(1)).isEqualTo(BinaryString.fromString("first"));
+
+            assertThat(row2.getInt(0)).isEqualTo(2);
+            
assertThat(row2.getString(1)).isEqualTo(BinaryString.fromString("second"));
+        }
+    }
+
+    @Test
+    void testEncodeAllPrimitiveDataTypes() throws Exception {
+        DataType[] fieldTypes =
+                new DataType[] {
+                    DataTypes.TINYINT(),
+                    DataTypes.SMALLINT(),
+                    DataTypes.INT(),
+                    DataTypes.BIGINT(),
+                    DataTypes.FLOAT(),
+                    DataTypes.DOUBLE(),
+                    DataTypes.BOOLEAN(),
+                    DataTypes.STRING(),
+                    DataTypes.BYTES(),
+                    DataTypes.DECIMAL(10, 2)
+                };
+        try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) {
+            encoder.startNewRow();
+            encoder.encodeField(0, (byte) 1);
+            encoder.encodeField(1, (short) 100);
+            encoder.encodeField(2, 1000);
+            encoder.encodeField(3, 10000L);
+            encoder.encodeField(4, 1.5f);
+            encoder.encodeField(5, 2.5);
+            encoder.encodeField(6, false);
+            encoder.encodeField(7, BinaryString.fromString("hello"));
+            encoder.encodeField(8, new byte[] {1, 2, 3});
+            encoder.encodeField(9, Decimal.fromBigDecimal(new 
BigDecimal("123.45"), 10, 2));
+            BinaryRow row = encoder.finishRow();
+
+            assertThat(row.getByte(0)).isEqualTo((byte) 1);
+            assertThat(row.getShort(1)).isEqualTo((short) 100);
+            assertThat(row.getInt(2)).isEqualTo(1000);
+            assertThat(row.getLong(3)).isEqualTo(10000L);
+            assertThat(row.getFloat(4)).isEqualTo(1.5f);
+            assertThat(row.getDouble(5)).isEqualTo(2.5);
+            assertThat(row.getBoolean(6)).isFalse();
+            
assertThat(row.getString(7)).isEqualTo(BinaryString.fromString("hello"));
+            assertThat(row.getBytes(8)).isEqualTo(new byte[] {1, 2, 3});
+            assertThat(row.getDecimal(9, 10, 2))
+                    .isEqualTo(Decimal.fromBigDecimal(new 
BigDecimal("123.45"), 10, 2));
+        }
+    }
+
+    @Test
+    void testEncodeWithNullValues() throws Exception {
+        DataType[] fieldTypes =
+                new DataType[] {DataTypes.INT(), DataTypes.STRING(), 
DataTypes.BIGINT()};
+        try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) {
+            encoder.startNewRow();
+            encoder.encodeField(0, 42);
+            encoder.encodeField(1, null);
+            encoder.encodeField(2, 100L);
+            BinaryRow row = encoder.finishRow();
+
+            assertThat(row.getInt(0)).isEqualTo(42);
+            assertThat(row.isNullAt(1)).isTrue();
+            assertThat(row.getLong(2)).isEqualTo(100L);
+        }
+    }
+
+    @Test
+    void testEncodeTimestampTypes() throws Exception {
+        DataType[] fieldTypes = new DataType[] {DataTypes.TIMESTAMP_LTZ(3), 
DataTypes.INT()};
+        try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) {
+            TimestampLtz timestampLtz = TimestampLtz.fromEpochMillis(1000000L);
+
+            encoder.startNewRow();
+            encoder.encodeField(0, timestampLtz);
+            encoder.encodeField(1, 999);
+            BinaryRow row = encoder.finishRow();
+
+            assertThat(row.getTimestampLtz(0, 3)).isNotNull();
+            assertThat(row.getInt(1)).isEqualTo(999);
+        }
+    }
+
+    @Test
+    void testReuseEncoder() throws Exception {
+        DataType[] fieldTypes = new DataType[] {DataTypes.INT(), 
DataTypes.STRING()};
+        try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) {
+            encoder.startNewRow();
+            encoder.encodeField(0, 5);
+            encoder.encodeField(1, BinaryString.fromString("row5"));
+            BinaryRow row = encoder.finishRow();
+
+            assertThat(row.getInt(0)).isEqualTo(5);
+            
assertThat(row.getString(1)).isEqualTo(BinaryString.fromString("row5"));
+
+            encoder.startNewRow();
+            encoder.encodeField(0, 10);
+            encoder.encodeField(1, BinaryString.fromString("row10"));
+            row = encoder.finishRow();
+
+            assertThat(row.getInt(0)).isEqualTo(10);
+            
assertThat(row.getString(1)).isEqualTo(BinaryString.fromString("row10"));
+        }
+    }
+
+    @Test
+    void testEncodeEmptyRow() throws Exception {
+        DataType[] fieldTypes = new DataType[] {};
+        try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) {
+            encoder.startNewRow();
+            BinaryRow row = encoder.finishRow();
+
+            assertThat(row.getFieldCount()).isEqualTo(0);
+        }
+    }
+
+    @Test
+    void testEncodeSingleField() throws Exception {
+        DataType[] fieldTypes = new DataType[] {DataTypes.STRING()};
+        try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) {
+            encoder.startNewRow();
+            encoder.encodeField(0, BinaryString.fromString("single"));
+            BinaryRow row = encoder.finishRow();
+
+            
assertThat(row.getString(0)).isEqualTo(BinaryString.fromString("single"));
+        }
+    }
+
+    @Test
+    void testClose() throws Exception {
+        DataType[] fieldTypes = new DataType[] {DataTypes.INT()};
+        AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes);
+        encoder.close();
+    }
+
+    @Test
+    void testEncodeWithLargeStrings() throws Exception {
+        DataType[] fieldTypes = new DataType[] {DataTypes.STRING(), 
DataTypes.INT()};
+        try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) {
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < 1000; i++) {
+                sb.append("test");
+            }
+            String largeString = sb.toString();
+
+            encoder.startNewRow();
+            encoder.encodeField(0, BinaryString.fromString(largeString));
+            encoder.encodeField(1, 123);
+            BinaryRow row = encoder.finishRow();
+
+            
assertThat(row.getString(0)).isEqualTo(BinaryString.fromString(largeString));
+            assertThat(row.getInt(1)).isEqualTo(123);
+        }
+    }
+
+    @Test
+    void testEncodeWithBinaryData() throws Exception {
+        DataType[] fieldTypes = new DataType[] {DataTypes.BYTES(), 
DataTypes.BYTES()};
+        try (AlignedRowEncoder encoder = new AlignedRowEncoder(fieldTypes)) {
+            byte[] data1 = new byte[100];
+            byte[] data2 = new byte[200];
+            for (int i = 0; i < data1.length; i++) {
+                data1[i] = (byte) i;
+            }
+            for (int i = 0; i < data2.length; i++) {
+                data2[i] = (byte) (i % 256);
+            }
+
+            encoder.startNewRow();
+            encoder.encodeField(0, data1);
+            encoder.encodeField(1, data2);
+            BinaryRow row = encoder.finishRow();
+
+            assertThat(row.getBytes(0)).isEqualTo(data1);
+            assertThat(row.getBytes(1)).isEqualTo(data2);
+        }
+    }
+}
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java
index bb36c1f77..56a307563 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowReaderTest.java
@@ -109,9 +109,20 @@ public class IndexedRowReaderTest {
         assertThatArray(reader.readArray(dataTypes[19]))
                 .withElementType(DataTypes.INT())
                 .isEqualTo(GenericArray.of(1, 2, 3, 4, 5, -11, null, 444, 
102234));
+        assertThatArray(reader.readArray(dataTypes[20]))
+                .withElementType(DataTypes.FLOAT())
+                .isEqualTo(
+                        GenericArray.of(0.1f, 1.1f, -0.5f, 6.6f, 
Float.MAX_VALUE, Float.MIN_VALUE));
+        assertThatArray(reader.readArray(dataTypes[21]))
+                .withElementType(DataTypes.ARRAY(DataTypes.STRING()))
+                .isEqualTo(
+                        GenericArray.of(
+                                GenericArray.of(fromString("a"), null, 
fromString("c")),
+                                null,
+                                GenericArray.of(fromString("hello"), 
fromString("world"))));
         InternalRow nestedRow =
-                reader.readRow(dataTypes[20].getChildren().toArray(new 
DataType[0]));
-        GenericRow expectedInnerRow = GenericRow.of(20);
+                reader.readRow(dataTypes[22].getChildren().toArray(new 
DataType[0]));
+        GenericRow expectedInnerRow = GenericRow.of(22);
         GenericRow expectedNestedRow = GenericRow.of(123, expectedInnerRow, 
fromString("Test"));
         assertThatRow(nestedRow)
                 .withSchema(
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java 
b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java
index 9aea09e38..3e4066991 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/indexed/IndexedRowTest.java
@@ -22,6 +22,7 @@ import org.apache.fluss.row.BinaryWriter;
 import org.apache.fluss.row.Decimal;
 import org.apache.fluss.row.GenericArray;
 import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalArray;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
@@ -74,7 +75,7 @@ public class IndexedRowTest {
 
         assertAllTypeEquals(row);
 
-        assertThat(row.getFieldCount()).isEqualTo(21);
+        assertThat(row.getFieldCount()).isEqualTo(23);
         assertThat(row.anyNull()).isFalse();
         assertThat(row.anyNull(new int[] {0, 1})).isFalse();
     }
@@ -204,10 +205,21 @@ public class IndexedRowTest {
         writers[17].writeValue(writer, 17, 
TimestampLtz.fromEpochMillis(1698235273182L));
         writers[18].writeValue(writer, 18, 
TimestampLtz.fromEpochMillis(1698235273182L));
         writers[19].writeValue(writer, 19, GenericArray.of(1, 2, 3, 4, 5, -11, 
null, 444, 102234));
-
-        GenericRow innerRow = GenericRow.of(20);
+        writers[20].writeValue(
+                writer,
+                20,
+                GenericArray.of(0.1f, 1.1f, -0.5f, 6.6f, Float.MAX_VALUE, 
Float.MIN_VALUE));
+        writers[21].writeValue(
+                writer,
+                21,
+                GenericArray.of(
+                        GenericArray.of(fromString("a"), null, 
fromString("c")),
+                        null,
+                        GenericArray.of(fromString("hello"), 
fromString("world"))));
+
+        GenericRow innerRow = GenericRow.of(22);
         GenericRow nestedRow = GenericRow.of(123, innerRow, 
fromString("Test"));
-        writers[20].writeValue(writer, 20, nestedRow);
+        writers[22].writeValue(writer, 22, nestedRow);
 
         return writer;
     }
@@ -236,9 +248,20 @@ public class IndexedRowTest {
         assertThatArray(row.getArray(19))
                 .withElementType(DataTypes.INT())
                 .isEqualTo(GenericArray.of(1, 2, 3, 4, 5, -11, null, 444, 
102234));
-        GenericRow expectedInnerRow = GenericRow.of(20);
+        assertThatArray(row.getArray(20))
+                .withElementType(DataTypes.FLOAT().copy(false))
+                .isEqualTo(
+                        GenericArray.of(0.1f, 1.1f, -0.5f, 6.6f, 
Float.MAX_VALUE, Float.MIN_VALUE));
+        assertThatArray(row.getArray(21))
+                .withElementType(DataTypes.ARRAY(DataTypes.STRING()))
+                .isEqualTo(
+                        GenericArray.of(
+                                GenericArray.of(fromString("a"), null, 
fromString("c")),
+                                null,
+                                GenericArray.of(fromString("hello"), 
fromString("world"))));
+        GenericRow expectedInnerRow = GenericRow.of(22);
         GenericRow expectedNestedRow = GenericRow.of(123, expectedInnerRow, 
fromString("Test"));
-        assertThatRow(row.getRow(20, 3))
+        assertThatRow(row.getRow(22, 3))
                 .withSchema(
                         DataTypes.ROW(
                                 DataTypes.FIELD("u1", DataTypes.INT()),
@@ -248,4 +271,121 @@ public class IndexedRowTest {
                                 DataTypes.FIELD("u3", DataTypes.STRING())))
                 .isEqualTo(expectedNestedRow);
     }
+
+    @Test
+    void testIndexedArrayGetRow() {
+        DataType rowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("f1", DataTypes.INT()),
+                        DataTypes.FIELD("f2", DataTypes.STRING()));
+
+        DataType[] fieldTypes = new DataType[] {DataTypes.ARRAY(rowType)};
+        IndexedRow outerRow = new IndexedRow(fieldTypes);
+        IndexedRowWriter outerWriter = new IndexedRowWriter(fieldTypes);
+
+        GenericRow row1 = GenericRow.of(100, fromString("first"));
+        GenericRow row2 = GenericRow.of(200, fromString("second"));
+        GenericArray arrayData = GenericArray.of(row1, row2);
+
+        BinaryWriter.ValueWriter arrayWriter =
+                BinaryWriter.createValueWriter(fieldTypes[0], INDEXED);
+        arrayWriter.writeValue(outerWriter, 0, arrayData);
+
+        outerRow.pointTo(outerWriter.segment(), 0, outerWriter.position());
+
+        InternalArray array = outerRow.getArray(0);
+        assertThat(array).isNotNull();
+        assertThat(array.size()).isEqualTo(2);
+
+        InternalRow nestedRow1 = array.getRow(0, 2);
+        assertThat(nestedRow1.getInt(0)).isEqualTo(100);
+        assertThat(nestedRow1.getString(1)).isEqualTo(fromString("first"));
+
+        InternalRow nestedRow2 = array.getRow(1, 2);
+        assertThat(nestedRow2.getInt(0)).isEqualTo(200);
+        assertThat(nestedRow2.getString(1)).isEqualTo(fromString("second"));
+    }
+
+    @Test
+    void testIndexedArrayGetRowWithInvalidType() {
+        DataType[] fieldTypes = new DataType[] 
{DataTypes.ARRAY(DataTypes.INT())};
+        IndexedRow outerRow = new IndexedRow(fieldTypes);
+        IndexedRowWriter outerWriter = new IndexedRowWriter(fieldTypes);
+
+        GenericArray arrayData = GenericArray.of(1, 2, 3);
+        BinaryWriter.ValueWriter arrayWriter =
+                BinaryWriter.createValueWriter(fieldTypes[0], INDEXED);
+        arrayWriter.writeValue(outerWriter, 0, arrayData);
+
+        outerRow.pointTo(outerWriter.segment(), 0, outerWriter.position());
+
+        InternalArray array = outerRow.getArray(0);
+        assertThatThrownBy(() -> array.getRow(0, 1))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Can not get row from Array of type");
+    }
+
+    @Test
+    void testIndexedArrayGetRowWithWrongNumFields() {
+        DataType rowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("f1", DataTypes.INT()),
+                        DataTypes.FIELD("f2", DataTypes.STRING()));
+
+        DataType[] fieldTypes = new DataType[] {DataTypes.ARRAY(rowType)};
+        IndexedRow outerRow = new IndexedRow(fieldTypes);
+        IndexedRowWriter outerWriter = new IndexedRowWriter(fieldTypes);
+
+        GenericRow row1 = GenericRow.of(100, fromString("first"));
+        GenericArray arrayData = GenericArray.of(row1);
+
+        BinaryWriter.ValueWriter arrayWriter =
+                BinaryWriter.createValueWriter(fieldTypes[0], INDEXED);
+        arrayWriter.writeValue(outerWriter, 0, arrayData);
+
+        outerRow.pointTo(outerWriter.segment(), 0, outerWriter.position());
+
+        InternalArray array = outerRow.getArray(0);
+        assertThatThrownBy(() -> array.getRow(0, 5))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Unexpected number of fields");
+    }
+
+    @Test
+    void testIndexedArrayGetNestedArray() {
+        DataType innerArrayType = DataTypes.ARRAY(DataTypes.INT());
+        DataType outerArrayType = DataTypes.ARRAY(innerArrayType);
+
+        DataType[] fieldTypes = new DataType[] {outerArrayType};
+        IndexedRow row = new IndexedRow(fieldTypes);
+        IndexedRowWriter rowWriter = new IndexedRowWriter(fieldTypes);
+
+        GenericArray innerArray1 = GenericArray.of(1, 2, 3);
+        GenericArray innerArray2 = GenericArray.of(4, 5, 6);
+        GenericArray outerArrayData = GenericArray.of(innerArray1, 
innerArray2);
+
+        BinaryWriter.ValueWriter arrayWriter =
+                BinaryWriter.createValueWriter(fieldTypes[0], INDEXED);
+        arrayWriter.writeValue(rowWriter, 0, outerArrayData);
+
+        row.pointTo(rowWriter.segment(), 0, rowWriter.position());
+
+        InternalArray outerArray = row.getArray(0);
+        assertThat(outerArray).isNotNull();
+        assertThat(outerArray.size()).isEqualTo(2);
+
+        InternalArray nestedArray1 = outerArray.getArray(0);
+        assertThat(nestedArray1).isNotNull();
+        assertThat(nestedArray1.size()).isEqualTo(3);
+        assertThat(nestedArray1.getInt(0)).isEqualTo(1);
+        assertThat(nestedArray1.getInt(1)).isEqualTo(2);
+        assertThat(nestedArray1.getInt(2)).isEqualTo(3);
+
+        InternalArray nestedArray2 = outerArray.getArray(1);
+        assertThat(nestedArray2).isNotNull();
+        assertThat(nestedArray2.size()).isEqualTo(3);
+        assertThat(nestedArray2.getInt(0)).isEqualTo(4);
+        assertThat(nestedArray2.getInt(1)).isEqualTo(5);
+        assertThat(nestedArray2.getInt(2)).isEqualTo(6);
+    }
 }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java 
b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
index 3cb1834f1..662faca60 100644
--- a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
+++ b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java
@@ -104,6 +104,16 @@ public class DataTestUtils {
         return row;
     }
 
+    public static GenericRow row(RowType rowType, Object... objects) {
+        GenericRow row = new GenericRow(objects.length);
+        List<DataType> fieldTypes = rowType.getChildren();
+        for (int i = 0; i < objects.length; i++) {
+            Object value = toInternalObject(objects[i], fieldTypes.get(i));
+            row.setField(i, value);
+        }
+        return row;
+    }
+
     private static Object toInternalObject(Object obj) {
         if (obj == null) {
             return null;
@@ -124,6 +134,45 @@ public class DataTestUtils {
         }
     }
 
+    private static Object toInternalObject(Object obj, DataType dataType) {
+        if (obj == null) {
+            return null;
+        }
+
+        DataTypeRoot typeRoot = dataType.getTypeRoot();
+
+        if (typeRoot == DataTypeRoot.ROW) {
+            if (obj instanceof Object[]) {
+                return row((RowType) dataType, (Object[]) obj);
+            }
+            throw new IllegalArgumentException(
+                    "Expected Object[] for ROW type, but got: " + 
obj.getClass().getSimpleName());
+        }
+
+        if (typeRoot == DataTypeRoot.ARRAY) {
+            if (obj instanceof int[]) {
+                return new GenericArray((int[]) obj);
+            }
+            if (obj instanceof Object[]) {
+                DataType elementType = dataType.getChildren().get(0);
+                Object[] array = (Object[]) obj;
+                Object[] internalArray = new Object[array.length];
+                for (int j = 0; j < array.length; j++) {
+                    internalArray[j] = toInternalObject(array[j], elementType);
+                }
+                return new GenericArray(internalArray);
+            }
+            throw new IllegalArgumentException(
+                    "Expected array for ARRAY type, but got: " + 
obj.getClass().getSimpleName());
+        }
+
+        if (obj instanceof String) {
+            return BinaryString.fromString((String) obj);
+        }
+
+        return obj;
+    }
+
     public static CompactedRow compactedRow(RowType rowType, Object[] objects) 
{
         return genCompacted(rowType, objects);
     }
@@ -474,7 +523,7 @@ public class DataTestUtils {
             throws Exception {
         if (logFormat == LogFormat.ARROW) {
             List<InternalRow> rows =
-                    
objects.stream().map(DataTestUtils::row).collect(Collectors.toList());
+                    objects.stream().map(objs -> row(rowType, 
objs)).collect(Collectors.toList());
             return createArrowMemoryLogRecords(
                     rowType,
                     offsetBase,
diff --git a/fluss-filesystems/fluss-fs-obs/pom.xml 
b/fluss-filesystems/fluss-fs-obs/pom.xml
index ccdd16c02..976cc3522 100644
--- a/fluss-filesystems/fluss-fs-obs/pom.xml
+++ b/fluss-filesystems/fluss-fs-obs/pom.xml
@@ -230,8 +230,6 @@
                                         <exclude>.gitkeep</exclude>
                                         <exclude>mime.types</exclude>
                                         <exclude>mozilla/**</exclude>
-                                        <exclude>LICENSE.txt</exclude>
-                                        <exclude>license/LICENSE*</exclude>
                                         
<exclude>okhttp3/internal/publicsuffix/NOTICE</exclude>
                                         <exclude>NOTICE</exclude>
                                     </excludes>
diff --git a/fluss-filesystems/fluss-fs-oss/pom.xml 
b/fluss-filesystems/fluss-fs-oss/pom.xml
index 7517e5ec8..0d2b4cf41 100644
--- a/fluss-filesystems/fluss-fs-oss/pom.xml
+++ b/fluss-filesystems/fluss-fs-oss/pom.xml
@@ -214,8 +214,6 @@
                                         <exclude>.gitkeep</exclude>
                                         <exclude>mime.types</exclude>
                                         <exclude>mozilla/**</exclude>
-                                        <exclude>LICENSE.txt</exclude>
-                                        <exclude>license/LICENSE*</exclude>
                                         
<exclude>okhttp3/internal/publicsuffix/NOTICE</exclude>
                                         <exclude>NOTICE</exclude>
                                     </excludes>
diff --git a/fluss-flink/fluss-flink-1.18/pom.xml 
b/fluss-flink/fluss-flink-1.18/pom.xml
index 66d359354..758dd91b1 100644
--- a/fluss-flink/fluss-flink-1.18/pom.xml
+++ b/fluss-flink/fluss-flink-1.18/pom.xml
@@ -219,14 +219,6 @@
                                     
<include>org.apache.fluss:fluss-client</include>
                                 </includes>
                             </artifactSet>
-                            <filters>
-                                <filter>
-                                    <artifact>*</artifact>
-                                    <excludes>
-                                        <exclude>LICENSE*</exclude>
-                                    </excludes>
-                                </filter>
-                            </filters>
                         </configuration>
                     </execution>
                 </executions>
diff --git a/fluss-flink/fluss-flink-1.19/pom.xml 
b/fluss-flink/fluss-flink-1.19/pom.xml
index 6cc58223f..4a100b438 100644
--- a/fluss-flink/fluss-flink-1.19/pom.xml
+++ b/fluss-flink/fluss-flink-1.19/pom.xml
@@ -213,14 +213,6 @@
                                     
<include>org.apache.fluss:fluss-client</include>
                                 </includes>
                             </artifactSet>
-                            <filters>
-                                <filter>
-                                    <artifact>*</artifact>
-                                    <excludes>
-                                        <exclude>LICENSE*</exclude>
-                                    </excludes>
-                                </filter>
-                            </filters>
                         </configuration>
                     </execution>
                 </executions>
diff --git a/fluss-flink/fluss-flink-1.20/pom.xml 
b/fluss-flink/fluss-flink-1.20/pom.xml
index b4ff8790f..68c43ed03 100644
--- a/fluss-flink/fluss-flink-1.20/pom.xml
+++ b/fluss-flink/fluss-flink-1.20/pom.xml
@@ -234,14 +234,6 @@
                                     
<include>org.apache.fluss:fluss-client</include>
                                 </includes>
                             </artifactSet>
-                            <filters>
-                                <filter>
-                                    <artifact>*</artifact>
-                                    <excludes>
-                                        <exclude>LICENSE*</exclude>
-                                    </excludes>
-                                </filter>
-                            </filters>
                         </configuration>
                     </execution>
                 </executions>
diff --git a/fluss-flink/fluss-flink-2.1/pom.xml 
b/fluss-flink/fluss-flink-2.1/pom.xml
index b03110c23..5b5475f3d 100644
--- a/fluss-flink/fluss-flink-2.1/pom.xml
+++ b/fluss-flink/fluss-flink-2.1/pom.xml
@@ -244,14 +244,6 @@
                                     
<include>org.apache.fluss:fluss-client</include>
                                 </includes>
                             </artifactSet>
-                            <filters>
-                                <filter>
-                                    <artifact>*</artifact>
-                                    <excludes>
-                                        <exclude>LICENSE*</exclude>
-                                    </excludes>
-                                </filter>
-                            </filters>
                         </configuration>
                     </execution>
                 </executions>
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkArrayConverter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkArrayConverter.java
deleted file mode 100644
index e9ac4d0bb..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkArrayConverter.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.utils;
-
-import org.apache.fluss.row.InternalArray;
-import org.apache.fluss.types.ArrayType;
-import org.apache.fluss.types.DataType;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.GenericArrayData;
-import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.RawValueData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-
-import static 
org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter.createInternalConverter;
-import static org.apache.fluss.types.DataTypeChecks.getLength;
-import static org.apache.fluss.types.DataTypeChecks.getPrecision;
-import static org.apache.fluss.types.DataTypeChecks.getScale;
-
-/** Flink Array Converter. */
-public class FlinkArrayConverter implements ArrayData {
-    private final ArrayData arrayData;
-
-    FlinkArrayConverter(DataType flussDataType, Object flussField) {
-        DataType eleType = ((ArrayType) flussDataType).getElementType();
-        this.arrayData = copyArray((org.apache.fluss.row.InternalArray) 
flussField, eleType);
-    }
-
-    private ArrayData copyArray(org.apache.fluss.row.InternalArray from, 
DataType eleType) {
-        FlussRowToFlinkRowConverter.FlussDeserializationConverter converter =
-                createInternalConverter(eleType);
-        if (!eleType.isNullable()) {
-            switch (eleType.getTypeRoot()) {
-                case BOOLEAN:
-                    return new GenericArrayData(from.toBooleanArray());
-                case TINYINT:
-                    return new GenericArrayData(from.toByteArray());
-                case SMALLINT:
-                    return new GenericArrayData(from.toShortArray());
-                case INTEGER:
-                case DATE:
-                case TIME_WITHOUT_TIME_ZONE:
-                    return new GenericArrayData(from.toIntArray());
-                case BIGINT:
-                    return new GenericArrayData(from.toLongArray());
-                case FLOAT:
-                    return new GenericArrayData(from.toFloatArray());
-                case DOUBLE:
-                    return new GenericArrayData(from.toDoubleArray());
-            }
-        }
-
-        Object[] newArray = new Object[from.size()];
-
-        for (int i = 0; i < newArray.length; ++i) {
-            if (!from.isNullAt(i)) {
-                newArray[i] = converter.deserialize(getFieldValue(from, i, 
eleType));
-            } else {
-                newArray[i] = null;
-            }
-        }
-
-        return new GenericArrayData(newArray);
-    }
-
-    @Override
-    public int size() {
-        return arrayData.size();
-    }
-
-    @Override
-    public boolean isNullAt(int i) {
-        return arrayData.isNullAt(i);
-    }
-
-    @Override
-    public boolean getBoolean(int i) {
-        return arrayData.getBoolean(i);
-    }
-
-    @Override
-    public byte getByte(int i) {
-        return arrayData.getByte(i);
-    }
-
-    @Override
-    public short getShort(int i) {
-        return arrayData.getShort(i);
-    }
-
-    @Override
-    public int getInt(int i) {
-        return arrayData.getInt(i);
-    }
-
-    @Override
-    public long getLong(int i) {
-        return arrayData.getLong(i);
-    }
-
-    @Override
-    public float getFloat(int i) {
-        return arrayData.getFloat(i);
-    }
-
-    @Override
-    public double getDouble(int i) {
-        return arrayData.getDouble(i);
-    }
-
-    @Override
-    public StringData getString(int i) {
-        return arrayData.getString(i);
-    }
-
-    @Override
-    public DecimalData getDecimal(int i, int i1, int i2) {
-        return arrayData.getDecimal(i, i1, i2);
-    }
-
-    @Override
-    public TimestampData getTimestamp(int i, int i1) {
-        return arrayData.getTimestamp(i, i1);
-    }
-
-    @Override
-    public <T> RawValueData<T> getRawValue(int i) {
-        return arrayData.getRawValue(i);
-    }
-
-    @Override
-    public byte[] getBinary(int i) {
-        return arrayData.getBinary(i);
-    }
-
-    @Override
-    public ArrayData getArray(int i) {
-        return arrayData.getArray(i);
-    }
-
-    @Override
-    public MapData getMap(int i) {
-        return arrayData.getMap(i);
-    }
-
-    @Override
-    public RowData getRow(int i, int i1) {
-        return arrayData.getRow(i, i1);
-    }
-
-    @Override
-    public boolean[] toBooleanArray() {
-        return arrayData.toBooleanArray();
-    }
-
-    @Override
-    public byte[] toByteArray() {
-        return arrayData.toByteArray();
-    }
-
-    @Override
-    public short[] toShortArray() {
-        return arrayData.toShortArray();
-    }
-
-    @Override
-    public int[] toIntArray() {
-        return arrayData.toIntArray();
-    }
-
-    @Override
-    public long[] toLongArray() {
-        return arrayData.toLongArray();
-    }
-
-    @Override
-    public float[] toFloatArray() {
-        return arrayData.toFloatArray();
-    }
-
-    @Override
-    public double[] toDoubleArray() {
-        return arrayData.toDoubleArray();
-    }
-
-    public ArrayData getArrayData() {
-        return arrayData;
-    }
-
-    public static ArrayData deserialize(DataType flussDataType, Object 
flussField) {
-        return new FlinkArrayConverter(flussDataType, 
flussField).getArrayData();
-    }
-
-    private static Object getFieldValue(InternalArray array, int pos, DataType 
dataType) {
-        if (array.isNullAt(pos)) {
-            return null;
-        }
-
-        switch (dataType.getTypeRoot()) {
-            case CHAR:
-                return array.getChar(pos, getLength(dataType));
-            case STRING:
-                return array.getString(pos);
-            case BOOLEAN:
-                return array.getBoolean(pos);
-            case BINARY:
-                return array.getBinary(pos, getLength(dataType));
-            case BYTES:
-                return array.getBytes(pos);
-            case DECIMAL:
-                return array.getDecimal(pos, getPrecision(dataType), 
getScale(dataType));
-            case TINYINT:
-                return array.getByte(pos);
-            case SMALLINT:
-                return array.getShort(pos);
-            case INTEGER:
-            case DATE:
-            case TIME_WITHOUT_TIME_ZONE:
-                return array.getInt(pos);
-            case BIGINT:
-                return array.getLong(pos);
-            case FLOAT:
-                return array.getFloat(pos);
-            case DOUBLE:
-                return array.getDouble(pos);
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-                return array.getTimestampNtz(pos, getPrecision(dataType));
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                return array.getTimestampLtz(pos, getPrecision(dataType));
-            case ARRAY:
-                return array.getArray(pos);
-                // TODO: Add Map type support in future
-            case MAP:
-                throw new UnsupportedOperationException("Map type not 
supported yet");
-                // TODO: Requires InternalArray.getRow() method from 
fluss-common Row type support
-            case ROW:
-                // return array.getRow(pos, getFieldCount(dataType));
-                throw new UnsupportedOperationException("Row type requires 
fluss-common update");
-            default:
-                throw new IllegalArgumentException("Unsupported data type: " + 
dataType);
-        }
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkRowConverter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkRowConverter.java
deleted file mode 100644
index 279a96e23..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkRowConverter.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.utils;
-
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.types.DataType;
-import org.apache.fluss.types.RowType;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.RawValueData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.types.RowKind;
-
-import static 
org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter.createInternalConverter;
-import static org.apache.fluss.types.DataTypeChecks.getFieldCount;
-import static org.apache.fluss.types.DataTypeChecks.getLength;
-import static org.apache.fluss.types.DataTypeChecks.getPrecision;
-import static org.apache.fluss.types.DataTypeChecks.getScale;
-
-/** FlinkRowConverter. */
-public class FlinkRowConverter implements RowData {
-
-    private final RowData rowData;
-
-    FlinkRowConverter(DataType eleType, Object flussField) {
-        this.rowData = copyInternalRow((InternalRow) flussField, (RowType) 
eleType);
-    }
-
-    public RowData copyInternalRow(InternalRow row, RowType rowType) {
-        GenericRowData ret = new GenericRowData(row.getFieldCount());
-
-        for (int i = 0; i < row.getFieldCount(); ++i) {
-            DataType fieldType = rowType.getTypeAt(i);
-            FlussRowToFlinkRowConverter.FlussDeserializationConverter 
converter =
-                    createInternalConverter(fieldType);
-            ret.setField(i, converter.deserialize(getFieldValue(row, i, 
fieldType)));
-        }
-
-        return ret;
-    }
-
-    @Override
-    public int getArity() {
-        return rowData.getArity();
-    }
-
-    @Override
-    public RowKind getRowKind() {
-        return rowData.getRowKind();
-    }
-
-    @Override
-    public void setRowKind(RowKind rowKind) {
-        rowData.setRowKind(rowKind);
-    }
-
-    @Override
-    public boolean isNullAt(int i) {
-        return rowData.isNullAt(i);
-    }
-
-    @Override
-    public boolean getBoolean(int i) {
-        return rowData.getBoolean(i);
-    }
-
-    @Override
-    public byte getByte(int i) {
-        return rowData.getByte(i);
-    }
-
-    @Override
-    public short getShort(int i) {
-        return rowData.getShort(i);
-    }
-
-    @Override
-    public int getInt(int i) {
-        return rowData.getInt(i);
-    }
-
-    @Override
-    public long getLong(int i) {
-        return rowData.getLong(i);
-    }
-
-    @Override
-    public float getFloat(int i) {
-        return rowData.getFloat(i);
-    }
-
-    @Override
-    public double getDouble(int i) {
-        return rowData.getDouble(i);
-    }
-
-    @Override
-    public StringData getString(int i) {
-        return rowData.getString(i);
-    }
-
-    @Override
-    public DecimalData getDecimal(int i, int i1, int i2) {
-        return rowData.getDecimal(i, i1, i2);
-    }
-
-    @Override
-    public TimestampData getTimestamp(int i, int i1) {
-        return rowData.getTimestamp(i, i1);
-    }
-
-    @Override
-    public <T> RawValueData<T> getRawValue(int i) {
-        return rowData.getRawValue(i);
-    }
-
-    @Override
-    public byte[] getBinary(int i) {
-        return rowData.getBinary(i);
-    }
-
-    @Override
-    public ArrayData getArray(int i) {
-        return rowData.getArray(i);
-    }
-
-    @Override
-    public MapData getMap(int i) {
-        return rowData.getMap(i);
-    }
-
-    @Override
-    public RowData getRow(int i, int i1) {
-        return rowData.getRow(i, i1);
-    }
-
-    public RowData getRowData() {
-        return rowData;
-    }
-
-    public static RowData deserialize(DataType flussDataType, Object 
flussField) {
-        return new FlinkRowConverter(flussDataType, flussField).getRowData();
-    }
-
-    private static Object getFieldValue(InternalRow row, int pos, DataType 
dataType) {
-        if (row.isNullAt(pos)) {
-            return null;
-        }
-
-        switch (dataType.getTypeRoot()) {
-            case CHAR:
-                return row.getChar(pos, getLength(dataType));
-            case STRING:
-                return row.getString(pos);
-            case BOOLEAN:
-                return row.getBoolean(pos);
-            case BINARY:
-                return row.getBinary(pos, getLength(dataType));
-            case BYTES:
-                return row.getBytes(pos);
-            case DECIMAL:
-                return row.getDecimal(pos, getPrecision(dataType), 
getScale(dataType));
-            case TINYINT:
-                return row.getByte(pos);
-            case SMALLINT:
-                return row.getShort(pos);
-            case INTEGER:
-            case DATE:
-            case TIME_WITHOUT_TIME_ZONE:
-                return row.getInt(pos);
-            case BIGINT:
-                return row.getLong(pos);
-            case FLOAT:
-                return row.getFloat(pos);
-            case DOUBLE:
-                return row.getDouble(pos);
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-                return row.getTimestampNtz(pos, getPrecision(dataType));
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                return row.getTimestampLtz(pos, getPrecision(dataType));
-            case ARRAY:
-                return row.getArray(pos);
-                // TODO: Add Map type support in future
-            case MAP:
-                throw new UnsupportedOperationException("Map type not 
supported yet");
-            case ROW:
-                return row.getRow(pos, getFieldCount(dataType));
-            default:
-                throw new IllegalArgumentException("Unsupported data type: " + 
dataType);
-        }
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
index a0088f8e2..d27e66040 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java
@@ -20,13 +20,16 @@ package org.apache.fluss.flink.utils;
 import org.apache.fluss.record.LogRecord;
 import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalArray;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.RowType;
 
 import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -72,12 +75,12 @@ public class FlussRowToFlinkRowConverter {
     /**
      * Create a nullable runtime {@link FlussDeserializationConverter} from 
given {@link DataType}.
      */
-    protected FlussDeserializationConverter createNullableInternalConverter(
+    protected static FlussDeserializationConverter 
createNullableInternalConverter(
             DataType flussDataType) {
         return 
wrapIntoNullableInternalConverter(createInternalConverter(flussDataType));
     }
 
-    protected FlussDeserializationConverter wrapIntoNullableInternalConverter(
+    protected static FlussDeserializationConverter 
wrapIntoNullableInternalConverter(
             FlussDeserializationConverter flussDeserializationConverter) {
         return val -> {
             if (val == null) {
@@ -143,12 +146,43 @@ public class FlussRowToFlinkRowConverter {
                             timestampLtz.getNanoOfMillisecond());
                 };
             case ARRAY:
-                return (flussField) -> 
FlinkArrayConverter.deserialize(flussDataType, flussField);
+                ArrayType arrayType = (ArrayType) flussDataType;
+                InternalArray.ElementGetter elementGetter =
+                        
InternalArray.createElementGetter(arrayType.getElementType());
+                FlussDeserializationConverter elementConverter =
+                        
createNullableInternalConverter(arrayType.getElementType());
+                return (flussField) -> {
+                    InternalArray flussArray = (InternalArray) flussField;
+                    int size = flussArray.size();
+                    Object[] flinkArray = new Object[size];
+                    for (int i = 0; i < size; i++) {
+                        Object flussElement = 
elementGetter.getElementOrNull(flussArray, i);
+                        flinkArray[i] = 
elementConverter.deserialize(flussElement);
+                    }
+                    return new GenericArrayData(flinkArray);
+                };
             case MAP:
                 // TODO: Add Map type support in future
                 throw new UnsupportedOperationException("Map type not 
supported yet");
             case ROW:
-                return (flussField) -> 
FlinkRowConverter.deserialize(flussDataType, flussField);
+                RowType rowType = (RowType) flussDataType;
+                int fieldCount = rowType.getFieldCount();
+                InternalRow.FieldGetter[] fieldGetters = new 
InternalRow.FieldGetter[fieldCount];
+                FlussDeserializationConverter[] fieldConverters =
+                        new FlussDeserializationConverter[fieldCount];
+                for (int i = 0; i < fieldCount; i++) {
+                    fieldGetters[i] = 
InternalRow.createFieldGetter(rowType.getTypeAt(i), i);
+                    fieldConverters[i] = 
createNullableInternalConverter(rowType.getTypeAt(i));
+                }
+                return (flussField) -> {
+                    InternalRow flussRow = (InternalRow) flussField;
+                    GenericRowData flinkRowData = new 
GenericRowData(fieldCount);
+                    for (int i = 0; i < fieldCount; i++) {
+                        Object flussFieldValue = 
fieldGetters[i].getFieldOrNull(flussRow);
+                        flinkRowData.setField(i, 
fieldConverters[i].deserialize(flussFieldValue));
+                    }
+                    return flinkRowData;
+                };
             default:
                 throw new UnsupportedOperationException("Unsupported data 
type: " + flussDataType);
         }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
index efa408a7f..7df298387 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
@@ -278,6 +278,32 @@ abstract class FlinkComplexTypeITCase extends 
AbstractTestBase {
         assertResultsIgnoreOrder(collected, expected, true);
     }
 
+    @Test
+    void testRowTypesInLogTable() throws Exception {
+        tEnv.executeSql(
+                "create table row_log_test ("
+                        + "id int, "
+                        + "simple_row row<a int, b string>, "
+                        + "nested_row row<x int, y row<z int, w string>, v 
string>, "
+                        + "array_of_rows array<row<a int, b string>>"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO row_log_test VALUES "
+                                + "(1, ROW(10, 'hello'), ROW(20, ROW(30, 
'nested'), 'row1'), "
+                                + "ARRAY[ROW(1, 'a'), ROW(2, 'b')]), "
+                                + "(2, ROW(40, 'world'), ROW(50, ROW(60, 
'test'), 'row2'), "
+                                + "ARRAY[ROW(3, 'c')])")
+                .await();
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql("select * from 
row_log_test").collect();
+        List<String> expectedRows =
+                Arrays.asList(
+                        "+I[1, +I[10, hello], +I[20, +I[30, nested], row1], 
[+I[1, a], +I[2, b]]]",
+                        "+I[2, +I[40, world], +I[50, +I[60, test], row2], 
[+I[3, c]]]");
+        assertResultsIgnoreOrder(rowIter, expectedRows, true);
+    }
+
     @Test
     void testExceptionsForArrayTypeUsage() {
         assertThatThrownBy(
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkRowToFlussRowConverterTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkRowToFlussRowConverterTest.java
index 05261968f..c9379d8c1 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkRowToFlussRowConverterTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkRowToFlussRowConverterTest.java
@@ -35,6 +35,7 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.time.LocalDateTime;
 
+import static org.apache.flink.table.data.binary.BinaryStringData.fromString;
 import static org.apache.fluss.flink.utils.FlinkConversions.toFlinkRowType;
 import static org.apache.fluss.row.TestInternalRowGenerator.createAllRowType;
 import static org.apache.fluss.row.indexed.IndexedRowTest.assertAllTypeEquals;
@@ -51,7 +52,7 @@ public class FlinkRowToFlussRowConverterTest {
         try (FlinkRowToFlussRowConverter converter =
                 
FlinkRowToFlussRowConverter.create(toFlinkRowType(flussRowType))) {
             InternalRow internalRow = 
converter.toInternalRow(genRowDataForAllType());
-            assertThat(internalRow.getFieldCount()).isEqualTo(21);
+            assertThat(internalRow.getFieldCount()).isEqualTo(23);
             assertAllTypeEquals(internalRow);
         }
 
@@ -60,13 +61,13 @@ public class FlinkRowToFlussRowConverterTest {
                 FlinkRowToFlussRowConverter.create(
                         toFlinkRowType(flussRowType), KvFormat.COMPACTED)) {
             InternalRow internalRow = 
converter.toInternalRow(genRowDataForAllType());
-            assertThat(internalRow.getFieldCount()).isEqualTo(21);
+            assertThat(internalRow.getFieldCount()).isEqualTo(23);
             assertAllTypeEquals(internalRow);
         }
     }
 
     private static RowData genRowDataForAllType() {
-        GenericRowData genericRowData = new GenericRowData(21);
+        GenericRowData genericRowData = new GenericRowData(23);
         genericRowData.setField(0, true);
         genericRowData.setField(1, (byte) 2);
         genericRowData.setField(2, Short.parseShort("10"));
@@ -94,10 +95,24 @@ public class FlinkRowToFlussRowConverterTest {
         // 19: array
         genericRowData.setField(
                 19, new GenericArrayData(new Integer[] {1, 2, 3, 4, 5, -11, 
null, 444, 102234}));
+        genericRowData.setField(
+                20,
+                new GenericArrayData(
+                        new float[] {0.1f, 1.1f, -0.5f, 6.6f, Float.MAX_VALUE, 
Float.MIN_VALUE}));
+        genericRowData.setField(
+                21,
+                new GenericArrayData(
+                        new GenericArrayData[] {
+                            new GenericArrayData(
+                                    new StringData[] {fromString("a"), null, 
fromString("c")}),
+                            null,
+                            new GenericArrayData(
+                                    new StringData[] {fromString("hello"), 
fromString("world")})
+                        }));
 
-        // 20: row (nested row with fields: u1: INT, u2: ROW(v1: INT), u3: 
STRING)
+        // 22: row (nested row with fields: u1: INT, u2: ROW(v1: INT), u3: 
STRING)
         genericRowData.setField(
-                20, GenericRowData.of(123, GenericRowData.of(20), 
StringData.fromString("Test")));
+                22, GenericRowData.of(123, GenericRowData.of(22), 
StringData.fromString("Test")));
 
         return genericRowData;
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverterTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverterTest.java
index a149a7aea..06e48ffa2 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverterTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverterTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.utils;
 import org.apache.fluss.client.table.scanner.ScanRecord;
 import org.apache.fluss.flink.row.FlinkAsFlussArray;
 import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.indexed.IndexedRow;
 import org.apache.fluss.row.indexed.IndexedRowWriter;
 import org.apache.fluss.types.DataType;
@@ -36,6 +37,7 @@ import java.math.BigInteger;
 import java.time.LocalDate;
 import java.time.LocalTime;
 
+import static org.apache.fluss.row.BinaryString.fromString;
 import static org.apache.fluss.row.TestInternalRowGenerator.createAllRowType;
 import static org.apache.fluss.row.TestInternalRowGenerator.createAllTypes;
 import static org.apache.fluss.row.indexed.IndexedRowTest.genRecordForAllTypes;
@@ -100,12 +102,35 @@ class FlussRowToFlinkRowConverterTest {
                     new 
FlinkAsFlussArray(flinkRow.getArray(19)).toObjectArray(DataTypes.INT());
             assertThat(array1).isEqualTo(new Integer[] {1, 2, 3, 4, 5, -11, 
null, 444, 102234});
 
+            // array of float
+            Float[] array2 =
+                    new 
FlinkAsFlussArray(flinkRow.getArray(20)).toObjectArray(DataTypes.FLOAT());
+            assertThat(array2)
+                    .isEqualTo(
+                            new Float[] {
+                                0.1f, 1.1f, -0.5f, 6.6f, Float.MAX_VALUE, 
Float.MIN_VALUE
+                            });
+
+            // array of string
+            assertThat(flinkRow.getArray(21).size()).isEqualTo(3);
+            BinaryString[] stringArray1 =
+                    new FlinkAsFlussArray(flinkRow.getArray(21).getArray(0))
+                            .toObjectArray(DataTypes.STRING());
+            assertThat(stringArray1)
+                    .isEqualTo(new BinaryString[] {fromString("a"), null, 
fromString("c")});
+            assertThat(flinkRow.getArray(21).isNullAt(1)).isTrue();
+            BinaryString[] stringArray2 =
+                    new FlinkAsFlussArray(flinkRow.getArray(21).getArray(2))
+                            .toObjectArray(DataTypes.STRING());
+            assertThat(stringArray2)
+                    .isEqualTo(new BinaryString[] {fromString("hello"), 
fromString("world")});
+
             // nested row: ROW<u1: INT, u2: ROW<v1: INT>, u3: STRING>
-            assertThat(flinkRow.getRow(20, 3)).isNotNull();
-            assertThat(flinkRow.getRow(20, 3).getInt(0)).isEqualTo(123);
-            assertThat(flinkRow.getRow(20, 3).getRow(1, 1)).isNotNull();
-            assertThat(flinkRow.getRow(20, 3).getRow(1, 
1).getInt(0)).isEqualTo(20);
-            assertThat(flinkRow.getRow(20, 
3).getString(2).toString()).isEqualTo("Test");
+            assertThat(flinkRow.getRow(22, 3)).isNotNull();
+            assertThat(flinkRow.getRow(22, 3).getInt(0)).isEqualTo(123);
+            assertThat(flinkRow.getRow(22, 3).getRow(1, 1)).isNotNull();
+            assertThat(flinkRow.getRow(22, 3).getRow(1, 
1).getInt(0)).isEqualTo(22);
+            assertThat(flinkRow.getRow(22, 
3).getString(2).toString()).isEqualTo("Test");
         }
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
index ac1755d44..dd76bd2ec 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
@@ -157,7 +157,6 @@ public class IcebergRecordAsFlussRow implements InternalRow 
{
 
     @Override
     public InternalRow getRow(int pos, int numFields) {
-        // TODO: Support Row type conversion from Iceberg to Fluss
-        return null;
+        throw new UnsupportedOperationException();
     }
 }
diff --git a/fluss-lake/fluss-lake-lance/pom.xml 
b/fluss-lake/fluss-lake-lance/pom.xml
index c4c35093f..058554dd3 100644
--- a/fluss-lake/fluss-lake-lance/pom.xml
+++ b/fluss-lake/fluss-lake-lance/pom.xml
@@ -208,14 +208,6 @@
                                     <include>*:*</include>
                                 </includes>
                             </artifactSet>
-                            <filters>
-                                <filter>
-                                    <artifact>*</artifact>
-                                    <excludes>
-                                        <exclude>LICENSE*</exclude>
-                                    </excludes>
-                                </filter>
-                            </filters>
                         </configuration>
                     </execution>
                 </executions>
diff --git a/fluss-server/pom.xml b/fluss-server/pom.xml
index 3bd653f93..85e18b2c4 100644
--- a/fluss-server/pom.xml
+++ b/fluss-server/pom.xml
@@ -133,14 +133,6 @@
                                     <include>*:*</include>
                                 </includes>
                             </artifactSet>
-                            <filters>
-                                <filter>
-                                    <artifact>*</artifact>
-                                    <excludes>
-                                        <exclude>LICENSE*</exclude>
-                                    </excludes>
-                                </filter>
-                            </filters>
                             <relocations>
                                 <relocation>
                                     <pattern>org.apache.commons</pattern>
diff --git a/pom.xml b/pom.xml
index 247040141..80d4957c8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -865,6 +865,9 @@
                                 <exclude>META-INF/*.SF</exclude>
                                 <exclude>META-INF/*.DSA</exclude>
                                 <exclude>META-INF/*.RSA</exclude>
+                                <!-- Exclude Eclipse license files from 
dependencies -->
+                                <exclude>LICENSE-EPL-1.0.txt</exclude>
+                                <exclude>LICENSE-EDL-1.0.txt</exclude>
                                 <!-- META-INF/maven can contain 2 things:
                                     - For archetypes, it contains an 
archetype-metadata.xml.
                                     - For other jars, it contains the pom for 
all dependencies under the respective <groupId>/<artifactId>/ directory.

Reply via email to