This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 256bd7f38 [common] Add field_id for Nested Row. (#2322)
256bd7f38 is described below

commit 256bd7f3838a72965720eb2a8c6c7c0575f46b07
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Jan 8 19:03:23 2026 +0800

    [common] Add field_id for Nested Row. (#2322)
---
 .../fluss/client/table/scanner/log/LogFetcher.java |   2 +-
 .../client/table/scanner/log/LogScannerImpl.java   |   2 +-
 .../fluss/client/admin/FlussAdminITCase.java       |  35 ++--
 .../scanner/log/DefaultCompletedFetchTest.java     |   6 +-
 .../scanner/log/RemoteCompletedFetchTest.java      |  10 +-
 .../java/org/apache/fluss/metadata/Schema.java     | 228 ++++++++++++++++-----
 .../org/apache/fluss/record/FileLogProjection.java |  49 +----
 .../apache/fluss/record/LogRecordReadContext.java  |  22 +-
 .../fluss/record/ProjectionPushdownCache.java      |   5 +-
 .../java/org/apache/fluss/types/DataField.java     |  24 ++-
 .../org/apache/fluss/types/DataTypeChecks.java     |  39 ++++
 .../java/org/apache/fluss/types/DataTypes.java     |  27 ++-
 .../org/apache/fluss/types/ReassignFieldId.java    |  74 +++++++
 .../main/java/org/apache/fluss/types/RowType.java  |   5 +
 .../java/org/apache/fluss/utils/Projection.java    |  84 +++-----
 .../apache/fluss/utils/json/DataTypeJsonSerde.java |  22 +-
 .../org/apache/fluss/metadata/TableSchemaTest.java | 137 +++++++++++++
 .../apache/fluss/record/FileLogProjectionTest.java |  51 ++++-
 .../java/org/apache/fluss/types/DataTypesTest.java |  12 +-
 .../org/apache/fluss/utils/ProjectionTest.java     |  26 +--
 .../fluss/utils/json/ColumnJsonSerdeTest.java      |  17 +-
 .../fluss/utils/json/DataTypeJsonSerdeTest.java    |  43 +++-
 .../fluss/utils/json/SchemaJsonSerdeTest.java      |  38 +++-
 .../apache/fluss/flink/utils/FlinkConversions.java |  20 +-
 .../fluss/flink/sink/FlinkComplexTypeITCase.java   |  75 +++++++
 .../fluss/flink/utils/FlinkConversionsTest.java    |  67 +++++-
 .../fluss/server/coordinator/SchemaUpdate.java     |  11 +-
 .../server/coordinator/TableManagerITCase.java     |  28 ++-
 .../fluss/server/tablet/TabletServiceITCase.java   |   2 +-
 29 files changed, 904 insertions(+), 257 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
index a97bffac0..b5a03a1ca 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
@@ -530,7 +530,7 @@ public class LogFetcher implements Closeable {
                             assert projection != null;
                             reqForTable
                                     .setProjectionPushdownEnabled(true)
-                                    
.setProjectedFields(projection.getProjectionIdInOrder());
+                                    
.setProjectedFields(projection.getProjectionInOrder());
                         } else {
                             reqForTable.setProjectionPushdownEnabled(false);
                         }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
index b50bc3622..ebcae05c5 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
@@ -123,7 +123,7 @@ public class LogScannerImpl implements LogScanner {
                                     + tableRowType);
                 }
             }
-            return Projection.of(projectedFields, tableInfo.getSchema());
+            return Projection.of(projectedFields);
         } else {
             return null;
         }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index a7fb7e6ac..aaf06592e 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -71,6 +71,7 @@ import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
 import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.data.ServerTags;
+import org.apache.fluss.types.DataTypeChecks;
 import org.apache.fluss.types.DataTypes;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -394,7 +395,12 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
 
         admin.alterTable(
                         tablePath,
-                        Collections.singletonList(
+                        Arrays.asList(
+                                TableChange.addColumn(
+                                        "nested_row",
+                                        DataTypes.ROW(DataTypes.STRING(), 
DataTypes.INT()),
+                                        "new nested column",
+                                        TableChange.ColumnPosition.last()),
                                 TableChange.addColumn(
                                         "c1",
                                         DataTypes.STRING(),
@@ -408,23 +414,28 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                         .primaryKey("id")
                         .fromColumns(
                                 Arrays.asList(
+                                        new Schema.Column("id", 
DataTypes.INT(), "person id", 0),
                                         new Schema.Column(
-                                                "id", DataTypes.INT(), "person 
id", (short) 0),
+                                                "name", DataTypes.STRING(), 
"person name", 1),
+                                        new Schema.Column("age", 
DataTypes.INT(), "person age", 2),
                                         new Schema.Column(
-                                                "name",
-                                                DataTypes.STRING(),
-                                                "person name",
-                                                (short) 1),
+                                                "nested_row",
+                                                DataTypes.ROW(
+                                                        DataTypes.FIELD(
+                                                                "f0", 
DataTypes.STRING(), 4),
+                                                        DataTypes.FIELD("f1", 
DataTypes.INT(), 5)),
+                                                "new nested column",
+                                                3),
                                         new Schema.Column(
-                                                "age", DataTypes.INT(), 
"person age", (short) 2),
-                                        new Schema.Column(
-                                                "c1",
-                                                DataTypes.STRING(),
-                                                "new column c1",
-                                                (short) 3)))
+                                                "c1", DataTypes.STRING(), "new 
column c1", 6)))
                         .build();
         SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get();
         assertThat(schemaInfo).isEqualTo(new SchemaInfo(expectedSchema, 2));
+        // test field_id of rowType
+        assertThat(
+                        DataTypeChecks.equalsWithFieldId(
+                                schemaInfo.getSchema().getRowType(), 
expectedSchema.getRowType()))
+                .isTrue();
     }
 
     @Test
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
index 81463004c..9f50fe59e 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java
@@ -174,7 +174,7 @@ public class DefaultCompletedFetchTest {
         long fetchOffset = 0L;
         int bucketId = 0; // records for 0-10.
         TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
-        Projection projection = Projection.of(new int[] {0, 2}, schema);
+        Projection projection = Projection.of(new int[] {0, 2});
         MemoryLogRecords memoryLogRecords;
         if (logFormat == LogFormat.ARROW) {
             memoryLogRecords = genRecordsWithProjection(DATA2, projection, 
magic);
@@ -210,7 +210,7 @@ public class DefaultCompletedFetchTest {
         // test projection reorder.
         defaultCompletedFetch =
                 makeCompletedFetch(
-                        tb, resultForBucket0, fetchOffset, Projection.of(new 
int[] {2, 0}, schema));
+                        tb, resultForBucket0, fetchOffset, Projection.of(new 
int[] {2, 0}));
         scanRecords = defaultCompletedFetch.fetchRecords(8);
         assertThat(scanRecords.size()).isEqualTo(8);
         for (int i = 0; i < scanRecords.size(); i++) {
@@ -427,7 +427,7 @@ public class DefaultCompletedFetchTest {
                 DATA2_TABLE_ID,
                 testingSchemaGetter,
                 DEFAULT_COMPRESSION,
-                projection.getProjectionIdInOrder());
+                projection.getProjectionInOrder());
         ByteBuffer buffer =
                 toByteBuffer(
                         fileLogProjection
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java
index edc47a7ec..f3022c91b 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetchTest.java
@@ -226,10 +226,7 @@ class RemoteCompletedFetchTest {
                 createFileLogRecords(tableBucket, DATA2_PHYSICAL_TABLE_PATH, 
DATA2, logFormat);
         RemoteCompletedFetch completedFetch =
                 makeCompletedFetch(
-                        tableBucket,
-                        fileLogRecords,
-                        fetchOffset,
-                        Projection.of(new int[] {0, 2}, schema));
+                        tableBucket, fileLogRecords, fetchOffset, 
Projection.of(new int[] {0, 2}));
 
         List<ScanRecord> scanRecords = completedFetch.fetchRecords(8);
         List<Object[]> expectedObjects =
@@ -255,10 +252,7 @@ class RemoteCompletedFetchTest {
 
         completedFetch =
                 makeCompletedFetch(
-                        tableBucket,
-                        fileLogRecords,
-                        fetchOffset,
-                        Projection.of(new int[] {2, 0}, schema));
+                        tableBucket, fileLogRecords, fetchOffset, 
Projection.of(new int[] {2, 0}));
         scanRecords = completedFetch.fetchRecords(8);
         assertThat(scanRecords.size()).isEqualTo(8);
         for (int i = 0; i < scanRecords.size(); i++) {
diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
index 63066849b..21a3a3c93 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
@@ -19,9 +19,12 @@ package org.apache.fluss.metadata;
 
 import org.apache.fluss.annotation.PublicEvolving;
 import org.apache.fluss.annotation.PublicStable;
+import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.DataField;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.types.MapType;
+import org.apache.fluss.types.ReassignFieldId;
 import org.apache.fluss.types.RowType;
 import org.apache.fluss.utils.EncodingUtils;
 import org.apache.fluss.utils.StringUtils;
@@ -75,7 +78,8 @@ public final class Schema implements Serializable {
             @Nullable PrimaryKey primaryKey,
             int highestFieldId,
             List<String> autoIncrementColumnNames) {
-        this.columns = normalizeColumns(columns, primaryKey, 
autoIncrementColumnNames);
+        this.columns =
+                normalizeColumns(columns, primaryKey, 
autoIncrementColumnNames, highestFieldId);
         this.primaryKey = primaryKey;
         this.autoIncrementColumnNames = autoIncrementColumnNames;
         // pre-create the row type as it is the most frequently used part of 
the schema
@@ -85,7 +89,9 @@ public final class Schema implements Serializable {
                                 .map(
                                         column ->
                                                 new DataField(
-                                                        column.getName(), 
column.getDataType()))
+                                                        column.getName(),
+                                                        column.getDataType(),
+                                                        column.columnId))
                                 .collect(Collectors.toList()));
         this.highestFieldId = highestFieldId;
     }
@@ -248,34 +254,32 @@ public final class Schema implements Serializable {
             return this;
         }
 
-        public Builder highestFieldId(int highestFieldId) {
-            this.highestFieldId = new AtomicInteger(highestFieldId);
-            return this;
-        }
-
-        public Builder fromRowType(RowType rowType) {
-            checkNotNull(rowType, "rowType must not be null.");
-            final List<DataType> fieldDataTypes = rowType.getChildren();
-            final List<String> fieldNames = rowType.getFieldNames();
-            IntStream.range(0, fieldDataTypes.size())
-                    .forEach(i -> column(fieldNames.get(i), 
fieldDataTypes.get(i)));
-            return this;
-        }
-
-        /** Adopts the given field names and field data types as physical 
columns of the schema. */
-        public Builder fromFields(
-                List<String> fieldNames, List<? extends DataType> 
fieldDataTypes) {
-            checkNotNull(fieldNames, "Field names must not be null.");
-            checkNotNull(fieldDataTypes, "Field data types must not be null.");
-            checkArgument(
-                    fieldNames.size() == fieldDataTypes.size(),
-                    "Field names and field data types must have the same 
length.");
-            IntStream.range(0, fieldNames.size())
-                    .forEach(i -> column(fieldNames.get(i), 
fieldDataTypes.get(i)));
-            return this;
-        }
-
-        /** Adopts all columns from the given list. */
+        /**
+         * Adopts all columns from the given list.
+         *
+         * <p>This method directly uses the columns as-is, preserving their 
existing column IDs and
+         * all nested field IDs within their data types (e.g., field IDs in 
{@link RowType}, {@link
+         * ArrayType}, {@link MapType}). No field ID reassignment will occur.
+         *
+         * <p>This behavior is different from {@link #column(String, 
DataType)}, which automatically
+         * assigns new column IDs and reassigns all nested field IDs to ensure 
global uniqueness.
+         *
+         * <p>Use this method when:
+         *
+         * <ul>
+         *   <li>Loading existing schema from storage where IDs are already 
assigned
+         *   <li>Preserving schema identity during schema evolution
+         *   <li>Reconstructing schema from serialized format
+         * </ul>
+         *
+         * <p>Note: All input columns must either have column IDs set or none 
of them should have
+         * column IDs. Mixed states are not allowed.
+         *
+         * @param inputColumns the list of columns to adopt
+         * @return this builder for fluent API
+         * @throws IllegalStateException if columns have inconsistent column 
ID states (some set,
+         *     some not set)
+         */
         public Builder fromColumns(List<Column> inputColumns) {
             boolean nonSetColumnId =
                     inputColumns.stream()
@@ -289,9 +293,9 @@ public final class Schema implements Serializable {
 
             if (allSetColumnId) {
                 columns.addAll(inputColumns);
+                List<Integer> allFieldIds = collectAllFieldIds(inputColumns);
                 highestFieldId =
-                        new AtomicInteger(
-                                
columns.stream().mapToInt(Column::getColumnId).max().orElse(-1));
+                        new 
AtomicInteger(allFieldIds.stream().max(Integer::compareTo).orElse(-1));
             } else {
                 // if all columnId is not set, this maybe from old version 
schema. Just use its
                 // position as columnId.
@@ -310,6 +314,56 @@ public final class Schema implements Serializable {
             return this;
         }
 
+        public Builder highestFieldId(int highestFieldId) {
+            this.highestFieldId = new AtomicInteger(highestFieldId);
+            return this;
+        }
+
+        /**
+         * Adopts the field names and data types from the given {@link 
RowType} as physical columns
+         * of the schema.
+         *
+         * <p>This method internally calls {@link #column(String, DataType)} 
for each field, which
+         * means: The original field IDs in the RowType will be ignored and 
replaced with new ones.
+         * If you need to preserve existing field IDs, use {@link 
#fromColumns(List)} or {@link
+         * #fromSchema(Schema)} instead.
+         *
+         * @param rowType the row type to adopt fields from
+         * @return this builder for fluent API
+         */
+        public Builder fromRowType(RowType rowType) {
+            checkNotNull(rowType, "rowType must not be null.");
+            final List<DataType> fieldDataTypes = rowType.getChildren();
+            final List<String> fieldNames = rowType.getFieldNames();
+            IntStream.range(0, fieldDataTypes.size())
+                    .forEach(i -> column(fieldNames.get(i), 
fieldDataTypes.get(i)));
+            return this;
+        }
+
+        /**
+         * Adopts the given field names and field data types as physical 
columns of the schema.
+         *
+         * <p>This method internally calls {@link #column(String, DataType)} 
for each field, which
+         * means: The original field IDs in the RowType will be ignored and 
replaced with new ones.
+         * If you need to preserve existing field IDs, use {@link 
#fromColumns(List)} or {@link
+         * #fromSchema(Schema)} instead.
+         *
+         * @param fieldNames the list of field names
+         * @param fieldDataTypes the list of field data types
+         * @return this builder for fluent API
+         */
+        public Builder fromFields(
+                List<String> fieldNames, List<? extends DataType> 
fieldDataTypes) {
+            checkNotNull(fieldNames, "Field names must not be null.");
+            checkNotNull(fieldDataTypes, "Field data types must not be null.");
+            checkArgument(
+                    fieldNames.size() == fieldDataTypes.size(),
+                    "Field names and field data types must have the same 
length.");
+            IntStream.range(0, fieldNames.size())
+                    .forEach(i -> column(fieldNames.get(i), 
fieldDataTypes.get(i)));
+            return this;
+        }
+
         /**
          * Declares a column that is appended to this schema.
          *
@@ -317,13 +371,23 @@ public final class Schema implements Serializable {
          * and the order of fields in the data. Thus, columns represent the 
payload that is read
          * from and written to an external system.
          *
+         * <p>Note: If the data type contains nested types (e.g., {@link 
RowType}, {@link
+         * ArrayType}, {@link MapType}), all nested field IDs will be 
automatically reassigned to
+         * ensure global uniqueness. This is essential for schema evolution 
support. If you need to
+         * preserve existing field IDs, use {@link #fromColumns(List)} or 
{@link
+         * #fromSchema(Schema)} instead.
+         *
          * @param columnName column name
+         * @param dataType column data type
+         * @return this builder for fluent API
          */
         public Builder column(String columnName, DataType dataType) {
             checkNotNull(columnName, "Column name must not be null.");
             checkNotNull(dataType, "Data type must not be null.");
-            columns.add(
-                    new Column(columnName, dataType, null, 
highestFieldId.incrementAndGet(), null));
+            int id = highestFieldId.incrementAndGet();
+            // Reassign field id especially for nested types.
+            DataType reassignDataType = ReassignFieldId.reassign(dataType, 
highestFieldId);
+            columns.add(new Column(columnName, reassignDataType, null, id, 
null));
             return this;
         }
 
@@ -346,13 +410,10 @@ public final class Schema implements Serializable {
             checkNotNull(dataType, "Data type must not be null.");
             checkNotNull(aggFunction, "Aggregation function must not be 
null.");
 
-            columns.add(
-                    new Column(
-                            columnName,
-                            dataType,
-                            null,
-                            highestFieldId.incrementAndGet(),
-                            aggFunction));
+            int id = highestFieldId.incrementAndGet();
+            // Reassign field id especially for nested types.
+            DataType reassignDataType = ReassignFieldId.reassign(dataType, 
highestFieldId);
+            columns.add(new Column(columnName, reassignDataType, null, id, 
aggFunction));
             return this;
         }
 
@@ -441,17 +502,6 @@ public final class Schema implements Serializable {
 
         /** Returns an instance of an {@link Schema}. */
         public Schema build() {
-            Integer maximumColumnId =
-                    
columns.stream().map(Column::getColumnId).max(Integer::compareTo).orElse(0);
-
-            checkState(
-                    columns.isEmpty() || highestFieldId.get() >= 
maximumColumnId,
-                    "Highest field id must be greater than or equal to the 
maximum column id.");
-
-            checkState(
-                    
columns.stream().map(Column::getColumnId).distinct().count() == columns.size(),
-                    "Column ids must be unique.");
-
             return new Schema(columns, primaryKey, highestFieldId.get(), 
autoIncrementColumnNames);
         }
     }
@@ -629,8 +679,10 @@ public final class Schema implements Serializable {
     private static List<Column> normalizeColumns(
             List<Column> columns,
             @Nullable PrimaryKey primaryKey,
-            List<String> autoIncrementColumnNames) {
+            List<String> autoIncrementColumnNames,
+            int highestFieldId) {
 
+        checkFieldIds(columns, highestFieldId);
         List<String> columnNames =
                 
columns.stream().map(Column::getName).collect(Collectors.toList());
 
@@ -728,4 +780,74 @@ public final class Schema implements Serializable {
         }
         return new RowType(keyRowFields);
     }
+
+    /**
+     * Validates field IDs in the schema, including both top-level column IDs 
and nested field IDs.
+     *
+     * <p>This method performs the following checks:
+     *
+     * <ul>
+     *   <li>Ensures all top-level column IDs are unique
+     *   <li>Ensures all field IDs (including nested fields in ROW, ARRAY, MAP 
types) are globally
+     *       unique
+     *   <li>Verifies that the highest field ID is greater than or equal to 
all existing field IDs
+     * </ul>
+     *
+     * @param columns the list of columns to validate
+     * @param highestFieldId the highest field ID that should be greater than 
or equal to all field
+     *     IDs
+     * @throws IllegalStateException if any validation fails
+     */
+    private static void checkFieldIds(List<Column> columns, int 
highestFieldId) {
+
+        // Collect all field IDs (including nested fields) for validation
+        List<Integer> allFieldIds = collectAllFieldIds(columns);
+
+        // Validate all field IDs (including nested fields) are unique
+        long uniqueFieldIdsCount = allFieldIds.stream().distinct().count();
+        checkState(
+                uniqueFieldIdsCount == allFieldIds.size(),
+                "All field IDs (including nested fields) must be unique. Found 
%s unique IDs but expected %s.",
+                uniqueFieldIdsCount,
+                allFieldIds.size());
+
+        // Validate the highest field ID is greater than or equal to all field 
IDs
+        Integer maximumFieldId = 
allFieldIds.stream().max(Integer::compareTo).orElse(-1);
+        checkState(
+                columns.isEmpty() || highestFieldId >= maximumFieldId,
+                "Highest field ID (%s) must be greater than or equal to the 
maximum field ID (%s) including nested fields. Current columns is %s",
+                highestFieldId,
+                maximumFieldId,
+                columns);
+    }
+
+    /**
+     * Recursively collects all field IDs from a data type, including nested 
fields in ROW, ARRAY,
+     * and MAP types.
+     */
+    private static List<Integer> collectAllFieldIds(List<Column> columns) {
+        List<Integer> allFieldIds = new ArrayList<>();
+        for (Column column : columns) {
+            allFieldIds.add(column.getColumnId());
+            collectAllFieldIds(column.getDataType(), allFieldIds);
+        }
+        return allFieldIds;
+    }
+
+    private static void collectAllFieldIds(DataType dataType, List<Integer> 
fieldIds) {
+        if (dataType instanceof RowType) {
+            RowType rowType = (RowType) dataType;
+            for (DataField field : rowType.getFields()) {
+                fieldIds.add(field.getFieldId());
+                collectAllFieldIds(field.getType(), fieldIds);
+            }
+        } else if (dataType instanceof ArrayType) {
+            ArrayType arrayType = (ArrayType) dataType;
+            collectAllFieldIds(arrayType.getElementType(), fieldIds);
+        } else if (dataType instanceof MapType) {
+            MapType mapType = (MapType) dataType;
+            collectAllFieldIds(mapType.getKeyType(), fieldIds);
+            collectAllFieldIds(mapType.getValueType(), fieldIds);
+        }
+    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java 
b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
index 9f227adb1..4717271c2 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java
@@ -51,9 +51,7 @@ import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import static 
org.apache.fluss.record.DefaultLogRecordBatch.APPEND_ONLY_FLAG_MASK;
 import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET;
@@ -101,7 +99,7 @@ public class FileLogProjection {
     private SchemaGetter schemaGetter;
     private long tableId;
     private ArrowCompressionInfo compressionInfo;
-    private int[] selectedFieldIds;
+    private int[] selectedFieldPositions;
 
     public FileLogProjection(ProjectionPushdownCache projectionsCache) {
         this.projectionsCache = projectionsCache;
@@ -117,11 +115,11 @@ public class FileLogProjection {
             long tableId,
             SchemaGetter schemaGetter,
             ArrowCompressionInfo compressionInfo,
-            int[] selectedFieldIds) {
+            int[] selectedFieldPositions) {
         this.tableId = tableId;
         this.schemaGetter = schemaGetter;
         this.compressionInfo = compressionInfo;
-        this.selectedFieldIds = selectedFieldIds;
+        this.selectedFieldPositions = selectedFieldPositions;
     }
 
     /**
@@ -405,19 +403,17 @@ public class FileLogProjection {
 
     private ProjectionInfo getOrCreateProjectionInfo(short schemaId) {
         ProjectionInfo cachedProjection =
-                projectionsCache.getProjectionInfo(tableId, schemaId, 
selectedFieldIds);
+                projectionsCache.getProjectionInfo(tableId, schemaId, 
selectedFieldPositions);
         if (cachedProjection == null) {
-            cachedProjection = createProjectionInfo(schemaId, 
selectedFieldIds);
+            cachedProjection = createProjectionInfo(schemaId, 
selectedFieldPositions);
             projectionsCache.setProjectionInfo(
-                    tableId, schemaId, selectedFieldIds, cachedProjection);
+                    tableId, schemaId, selectedFieldPositions, 
cachedProjection);
         }
         return cachedProjection;
     }
 
-    private ProjectionInfo createProjectionInfo(short schemaId, int[] 
selectedFieldIds) {
+    private ProjectionInfo createProjectionInfo(short schemaId, int[] 
selectedFieldPositions) {
         org.apache.fluss.metadata.Schema schema = 
schemaGetter.getSchema(schemaId);
-        int[] selectedFieldPositions =
-                selectedFieldPositions(schemaGetter.getSchema(schemaId), 
selectedFieldIds);
         RowType rowType = schema.getRowType();
 
         // initialize the projection util information
@@ -460,37 +456,6 @@ public class FileLogProjection {
                 selectedFieldPositions);
     }
 
-    int[] selectedFieldPositions(org.apache.fluss.metadata.Schema schema, 
int[] projectedFields) {
-        Map<Integer, Integer> columnIdPositions = new HashMap<>();
-        List<Integer> columnIds = schema.getColumnIds();
-        for (int i = 0; i < columnIds.size(); i++) {
-            columnIdPositions.put(columnIds.get(i), i);
-        }
-
-        int prev = -1;
-        int[] selectedFieldPositions = new int[projectedFields.length];
-        for (int i = 0; i < projectedFields.length; i++) {
-            int fieldId = projectedFields[i];
-            Integer position = columnIdPositions.get(fieldId);
-            if (position == null) {
-                throw new InvalidColumnProjectionException(
-                        String.format(
-                                "Projected field id %s is not contained in %s",
-                                fieldId, columnIds));
-            }
-
-            selectedFieldPositions[i] = position;
-            if (position < prev) {
-                throw new InvalidColumnProjectionException(
-                        "The projection indexes should be in field order, but 
is "
-                                + Arrays.toString(projectedFields));
-            }
-
-            prev = position;
-        }
-        return selectedFieldPositions;
-    }
-
     /** Projection pushdown information for a specific schema and selected 
fields. */
     public static final class ProjectionInfo {
         final BitSet nodesProjection;
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
index 6d3ee99af..2b0f695ec 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
@@ -75,17 +75,14 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
         int schemaId = tableInfo.getSchemaId();
         if (projection == null) {
             // set a default dummy projection to simplify code
-            projection =
-                    Projection.of(
-                            IntStream.range(0, 
rowType.getFieldCount()).toArray(),
-                            tableInfo.getSchema());
+            projection = Projection.of(IntStream.range(0, 
rowType.getFieldCount()).toArray());
         }
 
         if (logFormat == LogFormat.ARROW) {
             if (readFromRemote) {
                 // currently, for remote read, arrow log doesn't support 
projection pushdown,
                 // so set the rowType as is.
-                int[] selectedFields = projection.getProjectionPositions();
+                int[] selectedFields = projection.getProjection();
                 return createArrowReadContext(
                         rowType, schemaId, selectedFields, false, 
schemaGetter);
             } else {
@@ -101,10 +98,10 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
                         schemaGetter);
             }
         } else if (logFormat == LogFormat.INDEXED) {
-            int[] selectedFields = projection.getProjectionPositions();
+            int[] selectedFields = projection.getProjection();
             return createIndexedReadContext(rowType, schemaId, selectedFields, 
schemaGetter);
         } else if (logFormat == LogFormat.COMPACTED) {
-            int[] selectedFields = projection.getProjectionPositions();
+            int[] selectedFields = projection.getProjection();
             return createCompactedRowReadContext(rowType, schemaId, 
selectedFields);
         } else {
             throw new IllegalArgumentException("Unsupported log format: " + 
logFormat);
@@ -145,6 +142,17 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
         return createArrowReadContext(rowType, schemaId, selectedFields, 
false, schemaGetter);
     }
 
+    @VisibleForTesting
+    public static LogRecordReadContext createArrowReadContext(
+            RowType rowType,
+            int schemaId,
+            SchemaGetter schemaGetter,
+            boolean projectionPushDowned) {
+        int[] selectedFields = IntStream.range(0, 
rowType.getFieldCount()).toArray();
+        return createArrowReadContext(
+                rowType, schemaId, selectedFields, projectionPushDowned, 
schemaGetter);
+    }
+
     /**
      * Creates a LogRecordReadContext for INDEXED log format.
      *
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java
 
b/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java
index a2d3da1ae..71f12192b 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java
@@ -49,8 +49,9 @@ public class ProjectionPushdownCache {
     }
 
     @Nullable
-    public ProjectionInfo getProjectionInfo(long tableId, short schemaId, 
int[] selectedColumnIds) {
-        ProjectionKey key = new ProjectionKey(tableId, schemaId, 
selectedColumnIds);
+    public ProjectionInfo getProjectionInfo(
+            long tableId, short schemaId, int[] selectedFieldPositions) {
+        ProjectionKey key = new ProjectionKey(tableId, schemaId, 
selectedFieldPositions);
         return projectionCache.getIfPresent(key);
     }
 
diff --git a/fluss-common/src/main/java/org/apache/fluss/types/DataField.java 
b/fluss-common/src/main/java/org/apache/fluss/types/DataField.java
index c4f935680..296f6215b 100644
--- a/fluss-common/src/main/java/org/apache/fluss/types/DataField.java
+++ b/fluss-common/src/main/java/org/apache/fluss/types/DataField.java
@@ -47,14 +47,25 @@ public class DataField implements Serializable {
 
     private final @Nullable String description;
 
-    public DataField(String name, DataType type, @Nullable String description) 
{
+    private final int fieldId;
+
+    public DataField(String name, DataType type, @Nullable String description, 
int fieldId) {
         this.name = checkNotNull(name, "Field name must not be null.");
         this.type = checkNotNull(type, "Field type must not be null.");
         this.description = description;
+        this.fieldId = fieldId;
+    }
+
+    public DataField(String name, DataType type, Integer fieldId) {
+        this(name, type, null, fieldId);
     }
 
     public DataField(String name, DataType type) {
-        this(name, type, null);
+        this(name, type, -1);
+    }
+
+    public DataField(String name, DataType type, @Nullable String description) 
{
+        this(name, type, description, -1);
     }
 
     public String getName() {
@@ -65,12 +76,16 @@ public class DataField implements Serializable {
         return type;
     }
 
+    public int getFieldId() {
+        return fieldId;
+    }
+
     public Optional<String> getDescription() {
         return Optional.ofNullable(description);
     }
 
     public DataField copy() {
-        return new DataField(name, type.copy(), description);
+        return new DataField(name, type.copy(), description, fieldId);
     }
 
     public String asSummaryString() {
@@ -90,6 +105,9 @@ public class DataField implements Serializable {
             return false;
         }
         DataField rowField = (DataField) o;
+        // ignore field id in equality check, because field id is not part of 
type definition,
+        // use RowType#
+        // we may ignore description too in the future.
         return name.equals(rowField.name)
                 && type.equals(rowField.type)
                 && Objects.equals(description, rowField.description);
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java 
b/fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java
index 76e399568..15ec476a3 100644
--- a/fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java
+++ b/fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java
@@ -57,6 +57,11 @@ public final class DataTypeChecks {
         return dataType.accept(FIELD_TYPES_EXTRACTOR);
     }
 
+    /** Checks whether two data types are equal including field ids for row 
types. */
+    public static boolean equalsWithFieldId(DataType original, DataType that) {
+        return that.accept(new DataTypeEqualsWithFieldId(original));
+    }
+
     private DataTypeChecks() {
         // no instantiation
     }
@@ -155,4 +160,38 @@ public final class DataTypeChecks {
             return rowType.getFieldTypes();
         }
     }
+
+    private static class DataTypeEqualsWithFieldId extends 
DataTypeDefaultVisitor<Boolean> {
+        private final DataType original;
+
+        private DataTypeEqualsWithFieldId(DataType original) {
+            this.original = original;
+        }
+
+        @Override
+        public Boolean visit(RowType that) {
+            if (!original.equals(that)) {
+                return false;
+            }
+
+            // compare field ids.
+            List<DataField> originalFields = ((RowType) original).getFields();
+            List<DataField> thatFields = that.getFields();
+            for (int i = 0; i < that.getFieldCount(); i++) {
+                DataField originalField = originalFields.get(i);
+                DataField thatField = thatFields.get(i);
+                if (originalField.getFieldId() != thatField.getFieldId()
+                        || !equalsWithFieldId(originalField.getType(), 
thatField.getType())) {
+                    return false;
+                }
+            }
+
+            return true;
+        }
+
+        @Override
+        protected Boolean defaultMethod(DataType that) {
+            return original.equals(that);
+        }
+    }
 }
diff --git a/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java 
b/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java
index 8993b146f..d0bc0d627 100644
--- a/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java
+++ b/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java
@@ -301,16 +301,39 @@ public class DataTypes {
         return new MapType(keyType, valueType);
     }
 
-    /** Field definition with field name and data type. */
     public static DataField FIELD(String name, DataType type) {
         return new DataField(name, type);
     }
 
-    /** Field definition with field name, data type, and a description. */
+    /**
+     * Creates a field definition with field name, data type, and field ID.
+     *
+     * @param name the field name
+     * @param type the field data type
+     * @param fieldId the field ID for schema evolution
+     * @return a new data field without description
+     */
+    public static DataField FIELD(String name, DataType type, int fieldId) {
+        return new DataField(name, type, fieldId);
+    }
+
     public static DataField FIELD(String name, DataType type, String 
description) {
         return new DataField(name, type, description);
     }
 
+    /**
+     * Creates a field definition with field name, data type, description, and 
field ID.
+     *
+     * @param name the field name
+     * @param type the field data type
+     * @param description the field description
+     * @param fieldId the field ID for schema evolution
+     * @return a new data field with all properties
+     */
+    public static DataField FIELD(String name, DataType type, String 
description, int fieldId) {
+        return new DataField(name, type, description, fieldId);
+    }
+
     /**
      * Data type of a sequence of fields. A field consists of a field name, 
field type, and an
      * optional description. The most specific type of a row of a table is a 
row type. In this case,
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/types/ReassignFieldId.java 
b/fluss-common/src/main/java/org/apache/fluss/types/ReassignFieldId.java
new file mode 100644
index 000000000..3c095f6df
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/types/ReassignFieldId.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.types;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Visitor that recursively reassigns field IDs in nested data types using a 
provided counter. */
+public class ReassignFieldId extends DataTypeDefaultVisitor<DataType> {
+
+    private final AtomicInteger highestFieldId;
+
+    public ReassignFieldId(AtomicInteger highestFieldId) {
+        this.highestFieldId = highestFieldId;
+    }
+
+    public static DataType reassign(DataType input, AtomicInteger 
highestFieldId) {
+        return input.accept(new ReassignFieldId(highestFieldId));
+    }
+
+    @Override
+    public DataType visit(ArrayType arrayType) {
+        return new ArrayType(arrayType.isNullable(), 
arrayType.getElementType().accept(this));
+    }
+
+    @Override
+    public DataType visit(MapType mapType) {
+        return new MapType(
+                mapType.isNullable(),
+                mapType.getKeyType().accept(this),
+                mapType.getValueType().accept(this));
+    }
+
+    @Override
+    public DataType visit(RowType rowType) {
+        List<DataField> originalDataFields = rowType.getFields();
+
+        List<DataField> dataFields = new ArrayList<>();
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            DataField dataField = originalDataFields.get(i);
+            int id = highestFieldId.incrementAndGet();
+            DataType dataType = dataField.getType().accept(this);
+            dataFields.add(
+                    new DataField(
+                            dataField.getName(),
+                            dataType,
+                            dataField.getDescription().orElse(null),
+                            id));
+        }
+
+        return new RowType(rowType.isNullable(), dataFields);
+    }
+
+    @Override
+    protected DataType defaultMethod(DataType dataType) {
+        return dataType;
+    }
+}
diff --git a/fluss-common/src/main/java/org/apache/fluss/types/RowType.java 
b/fluss-common/src/main/java/org/apache/fluss/types/RowType.java
index 090fe9655..fe2cfae05 100644
--- a/fluss-common/src/main/java/org/apache/fluss/types/RowType.java
+++ b/fluss-common/src/main/java/org/apache/fluss/types/RowType.java
@@ -167,6 +167,11 @@ public final class RowType extends DataType {
         return fields.equals(rowType.fields);
     }
 
+    /** Checks whether two data types are equal including field ids for row 
types. */
+    public boolean equalsWithFieldId(DataType other) {
+        return DataTypeChecks.equalsWithFieldId(this, other);
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(super.hashCode(), fields);
diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java
index 7c392b055..c16c6b10b 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java
@@ -17,11 +17,9 @@
 
 package org.apache.fluss.utils;
 
-import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.types.RowType;
 
 import java.util.Arrays;
-import java.util.List;
 
 import static org.apache.fluss.utils.Preconditions.checkState;
 
@@ -30,14 +28,12 @@ import static 
org.apache.fluss.utils.Preconditions.checkState;
  * projection includes both reducing the accessible fields and reordering 
them. Currently, this only
  * supports top-level projection. Nested projection will be supported in the 
future.
  *
- * <p>For example, given a row with fields [a, b, c, d, e] with id [0, 1, 3, 
4, 5], a projection [2,
- * 0, 3] will project the row to [c, a, d] with id [3, 0, 4]. The projection 
indexes are 0-based.
+ * <p>For example, given a row with fields [a, b, c, d, e], a projection [2, 
0, 3] will project the
+ * row to [c, a, d]. The projection indexes are 0-based.
  *
  * <ul>
- *   <li>The {@link #projectionPositions} indexes will be [2, 0, 3]
- *   <li>The {@link #projectionPositions} id will be [3, 0, 4]
- *   <li>The {@link #projectionIdsInOrder} indexes will be [0, 3, 4]
- *   <li>The {@link #projectionPositionsOrderById} indexes will be [0, 2, 3]
+ *   <li>The {@link #projection} indexes will be [2, 0, 3]
+ *   <li>The {@link #projectionInOrder} indexes will be [0, 2, 3]
  *   <li>The {@link #reorderingIndexes} indexes will be [1, 0, 2]
  * </ul>
  *
@@ -45,76 +41,42 @@ import static 
org.apache.fluss.utils.Preconditions.checkState;
  */
 public class Projection {
     /** the projection indexes including both selected fields and reordering 
them. */
-    final int[] projectionPositions;
+    final int[] projection;
     /** the projection indexes that only select fields but not reordering 
them. */
-    final int[] projectionPositionsOrderById;
-    /** the projection indexes that only select fields but not reordering 
them. */
-    final int[] projectionIdsInOrder;
-    /**
-     * the indexes to reorder the fields of {@link 
#projectionPositionsOrderById} to {@link
-     * #projectionPositions}.
-     */
+    final int[] projectionInOrder;
+    /** the indexes to reorder the fields of {@link #projectionInOrder} to 
{@link #projection}. */
     final int[] reorderingIndexes;
     /** the flag to indicate whether reordering is needed. */
     final boolean reorderingNeeded;
 
-    public static Projection of(int[] indexes, Schema schema) {
-        int[] ids = new int[indexes.length];
-        List<Integer> columnIds = schema.getColumnIds();
-        for (int i = 0; i < indexes.length; i++) {
-            ids[i] = columnIds.get(indexes[i]);
-        }
-
-        return new Projection(indexes, ids);
+    /** Create a {@link Projection} of the provided {@code indexes}. */
+    public static Projection of(int[] indexes) {
+        return new Projection(indexes);
     }
 
-    /**
-     * Create a {@link Projection} of the provided {@code indexes} and {@code 
projectionIds}.
-     *
-     * <p>Typically, {@code projectionIndexes} and {@code projectionIds} are 
the same. But when
-     * removing a middle column or reordering columns, they won't be the same. 
In this case, when
-     * the schema on the fluss server side differs from the schema on the 
client side during user
-     * queries, using {@code projectionIds} ensures correctness by mapping to 
the actual server-side
-     * column positions.
-     *
-     * @param projectionIndexes the indexes of the projection, which is used 
for user to read data
-     *     from client side. These indexes are based on the schema visible to 
the client side and
-     *     are used to parse the data returned by the fluss server.
-     * @param projectionIds the ids of the projection, which is used for fluss 
server scan. These
-     *     ids are based on the actual schema on the fluss server side and are 
pushed down to the
-     *     server for querying.
-     */
-    private Projection(int[] projectionIndexes, int[] projectionIds) {
-        checkState(
-                projectionIds.length == projectionIndexes.length,
-                "The number of ids must be equal to the number of indexes.");
-        this.projectionPositions = projectionIndexes;
-
-        // reorder the projection id for lookup.
-        this.projectionIdsInOrder = Arrays.copyOf(projectionIds, 
projectionIds.length);
-        Arrays.sort(projectionIdsInOrder);
-        this.reorderingNeeded = !Arrays.equals(projectionIds, 
projectionIdsInOrder);
-        this.reorderingIndexes = new int[projectionPositions.length];
-        this.projectionPositionsOrderById = new 
int[projectionPositions.length];
-        for (int i = 0; i < projectionIds.length; i++) {
-            int index = Arrays.binarySearch(projectionIdsInOrder, 
projectionIds[i]);
+    private Projection(int[] projection) {
+        this.projection = projection;
+        this.projectionInOrder = Arrays.copyOf(projection, projection.length);
+        Arrays.sort(projectionInOrder);
+        this.reorderingNeeded = !Arrays.equals(projection, projectionInOrder);
+        this.reorderingIndexes = new int[projection.length];
+        for (int i = 0; i < projection.length; i++) {
+            int index = Arrays.binarySearch(projectionInOrder, projection[i]);
             checkState(index >= 0, "The projection index is invalid.");
             reorderingIndexes[i] = index;
-            projectionPositionsOrderById[i] = projectionIndexes[index];
         }
     }
 
     public RowType projectInOrder(RowType rowType) {
-        return rowType.project(projectionPositionsOrderById);
+        return rowType.project(projectionInOrder);
     }
 
-    public int[] getProjectionPositions() {
-        return projectionPositions;
+    public int[] getProjection() {
+        return projection;
     }
 
-    /** The id of the projection, which is used for fluss server scan. */
-    public int[] getProjectionIdInOrder() {
-        return projectionIdsInOrder;
+    public int[] getProjectionInOrder() {
+        return projectionInOrder;
     }
 
     public boolean isReorderingNeeded() {
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java
index 9bcd3262b..12e6fb1c5 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java
@@ -70,6 +70,7 @@ public class DataTypeJsonSerde implements 
JsonSerializer<DataType>, JsonDeserial
     static final String FIELD_NAME_FIELDS = "fields";
     static final String FIELD_NAME_FIELD_NAME = "name";
     static final String FIELD_NAME_FIELD_TYPE = "field_type";
+    static final String FIELD_NAME_FIELD_ID = "field_id";
     static final String FIELD_NAME_FIELD_DESCRIPTION = "description";
 
     @Override
@@ -190,6 +191,8 @@ public class DataTypeJsonSerde implements 
JsonSerializer<DataType>, JsonDeserial
                 jsonGenerator.writeStringField(
                         FIELD_NAME_FIELD_DESCRIPTION, 
dataField.getDescription().get());
             }
+
+            jsonGenerator.writeNumberField(FIELD_NAME_FIELD_ID, 
dataField.getFieldId());
             jsonGenerator.writeEndObject();
         }
         jsonGenerator.writeEndArray();
@@ -291,18 +294,23 @@ public class DataTypeJsonSerde implements 
JsonSerializer<DataType>, JsonDeserial
     private static DataType deserializeRow(JsonNode dataTypeNode) {
         final ArrayNode fieldNodes = (ArrayNode) 
dataTypeNode.get(FIELD_NAME_FIELDS);
         final List<DataField> fields = new ArrayList<>();
+
         for (JsonNode fieldNode : fieldNodes) {
             final String fieldName = 
fieldNode.get(FIELD_NAME_FIELD_NAME).asText();
             final DataType fieldType =
                     
DataTypeJsonSerde.INSTANCE.deserialize(fieldNode.get(FIELD_NAME_FIELD_TYPE));
-            final String fieldDescription;
-            if (fieldNode.has(FIELD_NAME_FIELD_DESCRIPTION)) {
-                fieldDescription = 
fieldNode.get(FIELD_NAME_FIELD_DESCRIPTION).asText();
-            } else {
-                fieldDescription = null;
-            }
-            fields.add(new DataField(fieldName, fieldType, fieldDescription));
+            final String fieldDescription =
+                    fieldNode.has(FIELD_NAME_FIELD_DESCRIPTION)
+                            ? 
fieldNode.get(FIELD_NAME_FIELD_DESCRIPTION).asText()
+                            : null;
+
+            final int fieldId =
+                    fieldNode.has(FIELD_NAME_FIELD_ID)
+                            ? fieldNode.get(FIELD_NAME_FIELD_ID).asInt()
+                            : -1;
+            fields.add(new DataField(fieldName, fieldType, fieldDescription, 
fieldId));
         }
+
         return new RowType(fields);
     }
 }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java 
b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
index 32ba62b06..82e2fd76e 100644
--- a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
@@ -18,10 +18,13 @@
 package org.apache.fluss.metadata;
 
 import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -122,6 +125,140 @@ class TableSchemaTest {
                 .hasMessage("Auto increment column can only be used in 
primary-key table.");
     }
 
+    @Test
+    void testReassignFieldId() {
+        // Schema.Builder.column will reassign field id in flatten order.
+        Schema schema =
+                Schema.newBuilder()
+                        .column("f0", DataTypes.STRING().copy(false))
+                        .column(
+                                "f1",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("n0", 
DataTypes.TINYINT(), 0),
+                                        DataTypes.FIELD("n1", 
DataTypes.STRING(), 1),
+                                        DataTypes.FIELD(
+                                                "n2",
+                                                DataTypes.ROW(
+                                                        DataTypes.FIELD(
+                                                                "m1", 
DataTypes.TINYINT(), 0)),
+                                                2)))
+                        .column(
+                                "f2",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("n0", 
DataTypes.TINYINT(), 0),
+                                        DataTypes.FIELD("n1", 
DataTypes.STRING(), 1)))
+                        .column("f3", DataTypes.STRING())
+                        .primaryKey("f0")
+                        .build();
+        assertThat(schema.getColumnIds()).containsExactly(0, 1, 6, 9);
+        RowType expectedType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                DataTypes.FIELD("f0", 
DataTypes.STRING().copy(false), 0),
+                                DataTypes.FIELD(
+                                        "f1",
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("n0", 
DataTypes.TINYINT(), 2),
+                                                DataTypes.FIELD("n1", 
DataTypes.STRING(), 3),
+                                                DataTypes.FIELD(
+                                                        "n2",
+                                                        DataTypes.ROW(
+                                                                
DataTypes.FIELD(
+                                                                        "m1",
+                                                                        
DataTypes.TINYINT(),
+                                                                        5)),
+                                                        4)),
+                                        1),
+                                DataTypes.FIELD(
+                                        "f2",
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("n0", 
DataTypes.TINYINT(), 7),
+                                                DataTypes.FIELD("n1", 
DataTypes.STRING(), 8)),
+                                        6),
+                                DataTypes.FIELD("f3", DataTypes.STRING(), 9)));
+        
assertThat(schema.getRowType().equalsWithFieldId(expectedType)).isTrue();
+
+        // Schema.Builder.fromColumns won't reassign field id.
+        List<Schema.Column> columns =
+                Arrays.asList(
+                        new Schema.Column("f0", 
DataTypes.STRING().copy(false), null, 0),
+                        new Schema.Column(
+                                "f1",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("n0", 
DataTypes.TINYINT(), 0),
+                                        DataTypes.FIELD("n1", 
DataTypes.STRING(), 1),
+                                        DataTypes.FIELD(
+                                                "n2",
+                                                DataTypes.ROW(
+                                                        DataTypes.FIELD(
+                                                                "m1", 
DataTypes.TINYINT(), 1)),
+                                                2)),
+                                null,
+                                1),
+                        new Schema.Column(
+                                "f2",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("n0", 
DataTypes.TINYINT(), 0),
+                                        DataTypes.FIELD("n1", 
DataTypes.STRING(), 1)),
+                                null,
+                                2));
+        assertThatThrownBy(() -> 
Schema.newBuilder().fromColumns(columns).build())
+                .hasMessageContaining(
+                        "All field IDs (including nested fields) must be 
unique. Found 3 unique IDs but expected 9");
+        List<Schema.Column> columns2 =
+                Arrays.asList(
+                        new Schema.Column("f0", 
DataTypes.STRING().copy(false), null, 0),
+                        new Schema.Column(
+                                "f1",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("n0", 
DataTypes.TINYINT(), 6),
+                                        DataTypes.FIELD("n1", 
DataTypes.STRING(), 7),
+                                        DataTypes.FIELD(
+                                                "n2",
+                                                DataTypes.ROW(
+                                                        DataTypes.FIELD(
+                                                                "m1", 
DataTypes.TINYINT(), 11)),
+                                                8)),
+                                null,
+                                1),
+                        new Schema.Column(
+                                "f2",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("n0", 
DataTypes.TINYINT(), 9),
+                                        DataTypes.FIELD("n1", 
DataTypes.STRING(), 10)),
+                                null,
+                                2));
+        schema = Schema.newBuilder().fromColumns(columns2).build();
+        assertThat(schema.getColumnIds()).containsExactly(0, 1, 2);
+        expectedType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                DataTypes.FIELD("f0", 
DataTypes.STRING().copy(false), 0),
+                                DataTypes.FIELD(
+                                        "f1",
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("n0", 
DataTypes.TINYINT(), 6),
+                                                DataTypes.FIELD("n1", 
DataTypes.STRING(), 7),
+                                                DataTypes.FIELD(
+                                                        "n2",
+                                                        DataTypes.ROW(
+                                                                
DataTypes.FIELD(
+                                                                        "m1",
+                                                                        
DataTypes.TINYINT(),
+                                                                        11)),
+                                                        8)),
+                                        1),
+                                DataTypes.FIELD(
+                                        "f2",
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("n0", 
DataTypes.TINYINT(), 9),
+                                                DataTypes.FIELD("n1", 
DataTypes.STRING(), 10)),
+                                        2)));
+        
assertThat(schema.getRowType().equalsWithFieldId(expectedType)).isTrue();
+    }
+
     @Test
     void testSchemaBuilderColumnWithAggFunction() {
         Schema schema =
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java 
b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
index 9a27089ac..b85e4db32 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java
@@ -134,7 +134,7 @@ class FileLogProjectionTest {
                                         new int[] {3},
                                         recordsOfData2RowType.sizeInBytes()))
                 .isInstanceOf(InvalidColumnProjectionException.class)
-                .hasMessage("Projected field id 3 is not contained in [0, 1, 
2]");
+                .hasMessage("Projected fields [3] is out of bound for schema 
with 3 fields.");
 
         assertThatThrownBy(
                         () ->
@@ -162,6 +162,51 @@ class FileLogProjectionTest {
                         "The projection indexes should not contain duplicated 
fields, but is [0, 0, 0]");
     }
 
+    @Test
+    void testProjectionOldDataWithNewSchema() throws Exception {
+        // Currently, we only support add column at last.
+        short schemaId = 1;
+        try (FileLogRecords records =
+                createFileLogRecords(
+                        schemaId, LOG_MAGIC_VALUE_V1, TestData.DATA1_ROW_TYPE, 
TestData.DATA1)) {
+
+            ProjectionPushdownCache cache = new ProjectionPushdownCache();
+            FileLogProjection projection = new FileLogProjection(cache);
+            assertThat(
+                            doProjection(
+                                    2L,
+                                    2,
+                                    projection,
+                                    records,
+                                    new int[] {1},
+                                    records.sizeInBytes()))
+                    .containsExactly(
+                            new Object[] {"a"},
+                            new Object[] {"b"},
+                            new Object[] {"c"},
+                            new Object[] {"d"},
+                            new Object[] {"e"},
+                            new Object[] {"f"},
+                            new Object[] {"g"},
+                            new Object[] {"h"},
+                            new Object[] {"i"},
+                            new Object[] {"j"});
+
+            assertThatThrownBy(
+                            () ->
+                                    doProjection(
+                                            1L,
+                                            2,
+                                            projection,
+                                            records,
+                                            new int[] {0, 2},
+                                            records.sizeInBytes()))
+                    .isInstanceOf(InvalidColumnProjectionException.class)
+                    .hasMessage(
+                            "Projected fields [0, 2] is out of bound for 
schema with 2 fields.");
+        }
+    }
+
     static Stream<Arguments> projectedFieldsArgs() {
         return Stream.of(
                 Arguments.of((Object) new int[] {0}, LOG_MAGIC_VALUE_V0, 
(short) 1),
@@ -420,7 +465,7 @@ class FileLogProjectionTest {
 
     private List<Object[]> doProjection(
             long tableId,
-            short schemaId,
+            int schemaId,
             FileLogProjection projection,
             FileLogRecords fileLogRecords,
             int[] projectedFields,
@@ -437,7 +482,7 @@ class FileLogProjectionTest {
         List<Object[]> results = new ArrayList<>();
         long expectedOffset = 0L;
         try (LogRecordReadContext context =
-                createArrowReadContext(projectedType, schemaId, 
testingSchemaGetter)) {
+                createArrowReadContext(projectedType, schemaId, 
testingSchemaGetter, true)) {
             for (LogRecordBatch batch : project.batches()) {
                 try (CloseableIterator<LogRecord> records = 
batch.records(context)) {
                     while (records.hasNext()) {
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/types/DataTypesTest.java 
b/fluss-common/src/test/java/org/apache/fluss/types/DataTypesTest.java
index e9f0dc8cc..309f0fabd 100644
--- a/fluss-common/src/test/java/org/apache/fluss/types/DataTypesTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/types/DataTypesTest.java
@@ -608,6 +608,16 @@ public class DataTypesTest {
                                 new DataField("a1", new IntType(), "column 
a1"),
                                 new DataField("b", new CharType(5), "column 
b"))));
 
+        // test ignore field_id
+        dataTypeBaseAssert(
+                rowType,
+                true,
+                "ROW<`a` INT 'column a', `b` CHAR(5) 'column b'>",
+                new RowType(
+                        Arrays.asList(
+                                new DataField("a1", new IntType(), "column 
a1"),
+                                new DataField("b", new CharType(5), "column 
b"))));
+
         rowType =
                 new RowType(
                         false,
@@ -654,7 +664,7 @@ public class DataTypesTest {
                 new RowType(
                         Arrays.asList(
                                 new DataField("a1", new IntType(), "column 
a1"),
-                                new DataField("b", new CharType(5)))));
+                                new DataField("b", new CharType(5), 1))));
 
         rowType = RowType.of(DataTypes.CHAR(1), DataTypes.CHAR(2));
         dataTypeBaseAssert(
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/utils/ProjectionTest.java 
b/fluss-common/src/test/java/org/apache/fluss/utils/ProjectionTest.java
index 48fc2ac67..4f85e9516 100644
--- a/fluss-common/src/test/java/org/apache/fluss/utils/ProjectionTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/utils/ProjectionTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.fluss.utils;
 
-import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.ProjectedRow;
@@ -33,15 +32,8 @@ class ProjectionTest {
 
     @Test
     void testProjection() {
-        Schema schema =
-                Schema.newBuilder()
-                        .column("f0", DataTypes.INT())
-                        .column("f1", DataTypes.BIGINT())
-                        .column("f2", DataTypes.STRING())
-                        .column("f3", DataTypes.DOUBLE())
-                        .build();
-        Projection projection = Projection.of(new int[] {2, 0, 3}, schema);
-        assertThat(projection.getProjectionIdInOrder()).isEqualTo(new int[] 
{0, 2, 3});
+        Projection projection = Projection.of(new int[] {2, 0, 3});
+        assertThat(projection.getProjectionInOrder()).isEqualTo(new int[] {0, 
2, 3});
 
         RowType rowType =
                 projection.projectInOrder(
@@ -53,21 +45,21 @@ class ProjectionTest {
         assertThat(rowType)
                 .isEqualTo(
                         DataTypes.ROW(
-                                DataTypes.FIELD("f0", DataTypes.INT()),
-                                DataTypes.FIELD("f2", DataTypes.STRING()),
-                                DataTypes.FIELD("f3", DataTypes.DOUBLE())));
+                                DataTypes.FIELD("f0", DataTypes.INT(), 0),
+                                DataTypes.FIELD("f2", DataTypes.STRING(), 2),
+                                DataTypes.FIELD("f3", DataTypes.DOUBLE(), 3)));
 
         assertThat(projection.isReorderingNeeded()).isTrue();
         assertThat(projection.getReorderingIndexes()).isEqualTo(new int[] {1, 
0, 2});
         assertThat(rowType.project(projection.getReorderingIndexes()))
                 .isEqualTo(
                         DataTypes.ROW(
-                                DataTypes.FIELD("f2", DataTypes.STRING()),
-                                DataTypes.FIELD("f0", DataTypes.INT()),
-                                DataTypes.FIELD("f3", DataTypes.DOUBLE())));
+                                DataTypes.FIELD("f2", DataTypes.STRING(), 2),
+                                DataTypes.FIELD("f0", DataTypes.INT(), 0),
+                                DataTypes.FIELD("f3", DataTypes.DOUBLE(), 3)));
 
         GenericRow row = GenericRow.of(0, 1L, BinaryString.fromString("2"), 
3.0d);
-        ProjectedRow p1 = 
ProjectedRow.from(projection.getProjectionIdInOrder());
+        ProjectedRow p1 = ProjectedRow.from(projection.getProjectionInOrder());
         p1.replaceRow(row);
         ProjectedRow p2 = ProjectedRow.from(projection.getReorderingIndexes());
         p2.replaceRow(p1);
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/utils/json/ColumnJsonSerdeTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/utils/json/ColumnJsonSerdeTest.java
index 716e04347..eba8159fb 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/utils/json/ColumnJsonSerdeTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/utils/json/ColumnJsonSerdeTest.java
@@ -20,11 +20,13 @@ package org.apache.fluss.utils.json;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.RowType;
 
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -36,11 +38,21 @@ public class ColumnJsonSerdeTest extends 
JsonSerdeTestBase<Schema.Column> {
 
     @Override
     protected Schema.Column[] createObjects() {
-        Schema.Column[] columns = new Schema.Column[4];
+        Schema.Column[] columns = new Schema.Column[5];
         columns[0] = new Schema.Column("a", DataTypes.STRING());
         columns[1] = new Schema.Column("b", DataTypes.INT(), "hello b");
         columns[2] = new Schema.Column("c", new IntType(false), "hello c");
         columns[3] = new Schema.Column("d", new IntType(false), "hello c", 
(short) 2);
+        columns[4] =
+                new Schema.Column(
+                        "e",
+                        new RowType(
+                                true,
+                                Arrays.asList(
+                                        DataTypes.FIELD("f", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("g", 
DataTypes.STRING(), 1))),
+                        "hello c",
+                        (short) 2);
         return columns;
     }
 
@@ -50,7 +62,8 @@ public class ColumnJsonSerdeTest extends 
JsonSerdeTestBase<Schema.Column> {
             "{\"name\":\"a\",\"data_type\":{\"type\":\"STRING\"},\"id\":-1}",
             
"{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"hello 
b\",\"id\":-1}",
             
"{\"name\":\"c\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"hello
 c\",\"id\":-1}",
-            
"{\"name\":\"d\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"hello
 c\",\"id\":2}"
+            
"{\"name\":\"d\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"hello
 c\",\"id\":2}",
+            
"{\"name\":\"e\",\"data_type\":{\"type\":\"ROW\",\"fields\":[{\"name\":\"f\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":-1},{\"name\":\"g\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":1}]},\"comment\":\"hello
 c\",\"id\":2}"
         };
     }
 
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java
index 82d6ca4e8..20e338f60 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java
@@ -24,6 +24,8 @@ import org.apache.fluss.types.BooleanType;
 import org.apache.fluss.types.BytesType;
 import org.apache.fluss.types.CharType;
 import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeChecks;
+import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.DateType;
 import org.apache.fluss.types.DecimalType;
 import org.apache.fluss.types.DoubleType;
@@ -38,10 +40,15 @@ import org.apache.fluss.types.TimeType;
 import org.apache.fluss.types.TimestampType;
 import org.apache.fluss.types.TinyIntType;
 
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** Test for {@link DataTypeJsonSerde}. */
 public class DataTypeJsonSerdeTest extends JsonSerdeTestBase<DataType> {
 
@@ -49,6 +56,12 @@ public class DataTypeJsonSerdeTest extends 
JsonSerdeTestBase<DataType> {
         super(DataTypeJsonSerde.INSTANCE);
     }
 
+    @Override
+    protected void assertEquals(DataType actual, DataType expected) {
+        // compare with field_id.
+        assertThat(DataTypeChecks.equalsWithFieldId(actual, 
expected)).isTrue();
+    }
+
     @Override
     protected DataType[] createObjects() {
         final List<DataType> types =
@@ -77,7 +90,12 @@ public class DataTypeJsonSerdeTest extends 
JsonSerdeTestBase<DataType> {
                         new LocalZonedTimestampType(3),
                         new ArrayType(new IntType(false)),
                         new MapType(new BigIntType(false), new IntType(false)),
-                        RowType.of(new BigIntType(), new IntType(false), new 
StringType()));
+                        new RowType(
+                                true,
+                                Arrays.asList(
+                                        DataTypes.FIELD("f0", new 
BigIntType(), null),
+                                        DataTypes.FIELD("f1", new 
IntType(false), null, 1),
+                                        DataTypes.FIELD("f2", new 
StringType(), null, 2))));
 
         final List<DataType> allTypes = new ArrayList<>();
         // consider nullable
@@ -139,8 +157,27 @@ public class DataTypeJsonSerdeTest extends 
JsonSerdeTestBase<DataType> {
             
"{\"type\":\"ARRAY\",\"nullable\":false,\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
             
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
             
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
-            
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
-            
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
+            
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":-1},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}",
+            
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"},\"field_id\":-1},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}"
         };
     }
+
+    @Test
+    void testJsonLackOfFieldId() {
+        // some fields with field_id while others without field_id.
+        String testJsonWithInconsistencyFieldId =
+                
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"field_id\":1},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"},\"field_id\":2}]}";
+        DataType dataType =
+                JsonSerdeUtils.readValue(
+                        
testJsonWithInconsistencyFieldId.getBytes(StandardCharsets.UTF_8),
+                        DataTypeJsonSerde.INSTANCE);
+        assertThat(dataType).isInstanceOf(RowType.class);
+        assertEquals(
+                dataType,
+                DataTypes.ROW(
+                                DataTypes.FIELD("f0", DataTypes.BIGINT()),
+                                DataTypes.FIELD("f1", 
DataTypes.INT().copy(false), 1),
+                                DataTypes.FIELD("f2", DataTypes.STRING(), 2))
+                        .copy(false));
+    }
 }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
index 018214b3f..54c5f69ea 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
@@ -79,6 +79,18 @@ public class SchemaJsonSerdeTest extends 
JsonSerdeTestBase<Schema> {
                     .enableAutoIncrement("b")
                     .build();
 
+    static final Schema SCHEMA_5 =
+            Schema.newBuilder()
+                    .column("a", DataTypes.INT())
+                    .withComment("a is first column")
+                    .column(
+                            "b",
+                            DataTypes.ROW(
+                                    DataTypes.FIELD("c", DataTypes.INT(), "a 
is first column", 0),
+                                    DataTypes.FIELD("d", DataTypes.INT(), "a 
is first column", 1)))
+                    .withComment("b is second column")
+                    .build();
+
     static final String SCHEMA_JSON_0 =
             
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a
 is first 
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b
 is second 
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c
 is third 
column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"highest_field_id\":2}";
     static final String SCHEMA_JSON_1 =
@@ -90,6 +102,9 @@ public class SchemaJsonSerdeTest extends 
JsonSerdeTestBase<Schema> {
     static final String SCHEMA_JSON_4 =
             
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a
 is first 
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"b
 is second 
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c
 is third 
column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
 
+    static final String SCHEMA_JSON_5 =
+            
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"a
 is first 
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"ROW\",\"fields\":[{\"name\":\"c\",\"field_type\":{\"type\":\"INTEGER\"},\"description\":\"a
 is first 
column\",\"field_id\":2},{\"name\":\"d\",\"field_type\":{\"type\":\"INTEGER\"},\"description\":\"a
 is first column\",\"field_id\":3}]},\"comment\":\"b is second 
column\",\"id\":1}],\"highest_field_id\":3}";
+
     static final Schema SCHEMA_WITH_AGG =
             Schema.newBuilder()
                     .column("product_id", DataTypes.BIGINT().copy(false))
@@ -112,9 +127,18 @@ public class SchemaJsonSerdeTest extends 
JsonSerdeTestBase<Schema> {
         super(SchemaJsonSerde.INSTANCE);
     }
 
+    @Override
+    protected void assertEquals(Schema actual, Schema expected) {
+        assertThat(actual).isEqualTo(expected);
+        // compare field ids.
+        
assertThat(actual.getRowType().equalsWithFieldId(expected.getRowType())).isTrue();
+    }
+
     @Override
     protected Schema[] createObjects() {
-        return new Schema[] {SCHEMA_0, SCHEMA_1, SCHEMA_2, SCHEMA_3, SCHEMA_4, 
SCHEMA_WITH_AGG};
+        return new Schema[] {
+            SCHEMA_0, SCHEMA_1, SCHEMA_2, SCHEMA_3, SCHEMA_4, SCHEMA_5, 
SCHEMA_WITH_AGG
+        };
     }
 
     @Override
@@ -125,6 +149,7 @@ public class SchemaJsonSerdeTest extends 
JsonSerdeTestBase<Schema> {
             SCHEMA_JSON_1,
             SCHEMA_JSON_3,
             SCHEMA_JSON_4,
+            SCHEMA_JSON_5,
             SCHEMA_JSON_WITH_AGG
         };
     }
@@ -132,13 +157,12 @@ public class SchemaJsonSerdeTest extends 
JsonSerdeTestBase<Schema> {
     @Test
     void testCompatibilityFromJsonLackOfColumnId() {
         String[] jsons = jsonLackOfColumnId();
-        Schema[] expectedSchema = new Schema[] {SCHEMA_0, SCHEMA_1, SCHEMA_2, 
SCHEMA_3};
+        Schema[] expectedSchema = new Schema[] {SCHEMA_0, SCHEMA_1, SCHEMA_2, 
SCHEMA_3, SCHEMA_5};
         for (int i = 0; i < jsons.length; i++) {
-            assertThat(
-                            JsonSerdeUtils.readValue(
-                                    jsons[i].getBytes(StandardCharsets.UTF_8),
-                                    SchemaJsonSerde.INSTANCE))
-                    .isEqualTo(expectedSchema[i]);
+            assertEquals(
+                    JsonSerdeUtils.readValue(
+                            jsons[i].getBytes(StandardCharsets.UTF_8), 
SchemaJsonSerde.INSTANCE),
+                    expectedSchema[i]);
         }
     }
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index 3088c9d30..9aef6c931 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -198,16 +198,16 @@ public class FlinkConversions {
         }
 
         // first build schema with physical columns
-        schemBuilder.fromColumns(
-                resolvedSchema.getColumns().stream()
-                        .filter(Column::isPhysical)
-                        .map(
-                                column ->
-                                        new Schema.Column(
-                                                column.getName(),
-                                                
FlinkConversions.toFlussType(column.getDataType()),
-                                                
column.getComment().orElse(null)))
-                        .collect(Collectors.toList()));
+        resolvedSchema.getColumns().stream()
+                .filter(Column::isPhysical)
+                .forEachOrdered(
+                        column -> {
+                            schemBuilder
+                                    .column(
+                                            column.getName(),
+                                            
FlinkConversions.toFlussType(column.getDataType()))
+                                    
.withComment(column.getComment().orElse(null));
+                        });
 
         // convert some flink options to fluss table configs.
         Map<String, String> storageProperties =
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
index 0c4432986..70d581fda 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
@@ -17,7 +17,11 @@
 
 package org.apache.fluss.flink.sink;
 
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.table.Table;
 import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -46,6 +50,7 @@ import java.util.List;
 import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
 import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Integration tests for Array type support in Flink connector. */
@@ -592,4 +597,74 @@ abstract class FlinkComplexTypeITCase extends 
AbstractTestBase {
                         "Bucket key column 'info' has unsupported data type 
ROW<`name` STRING, `age` INT>. "
                                 + "Currently, bucket key column does not 
support types: [ARRAY, MAP, ROW].");
     }
+
+    @Test
+    void testProjectionAndAddColumnInLogTable() throws Exception {
+        tEnv.executeSql(
+                "create table row_log_test ("
+                        + "id int, "
+                        + "simple_row row<a int, b string>, "
+                        + "nested_row row<x int, y row<z int, w string>, v 
string>, "
+                        + "array_of_rows array<row<a int, b string>>, "
+                        + "data string "
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO row_log_test VALUES "
+                                + "(1, ROW(10, 'hello'), ROW(20, ROW(30, 
'nested'), 'row1'), "
+                                + "ARRAY[ROW(1, 'a'), ROW(2, 'b')], 'aa'), "
+                                + "(2, ROW(40, 'world'), ROW(50, ROW(60, 
'test'), 'row2'), "
+                                + "ARRAY[ROW(3, 'c')], 'bb')")
+                .await();
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql("select * from 
row_log_test").collect();
+        List<String> expectedRows =
+                Arrays.asList(
+                        "+I[1, +I[10, hello], +I[20, +I[30, nested], row1], 
[+I[1, a], +I[2, b]], aa]",
+                        "+I[2, +I[40, world], +I[50, +I[60, test], row2], 
[+I[3, c]], bb]");
+        assertResultsIgnoreOrder(rowIter, expectedRows, true);
+
+        // Currently, flink not supported push down nested row projection 
because
+        // FlinkTableSource.supportsNestedProjection returns false.
+        // Todo: support nested row projection pushdown in
+        // https://github.com/apache/fluss/issues/2311 later.
+        String s =
+                tEnv.explainSql("select id, simple_row.a, nested_row.y.z, data 
from row_log_test");
+        assertThat(s)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
row_log_test, project=[id, simple_row, nested_row, data]]]");
+        rowIter =
+                tEnv.executeSql("select id, simple_row.a, nested_row.y.z, data 
from row_log_test")
+                        .collect();
+        expectedRows = Arrays.asList("+I[1, 10, 30, aa]", "+I[2, 40, 60, bb]");
+        assertResultsIgnoreOrder(rowIter, expectedRows, true);
+        // Test add columns
+        tEnv.executeSql(
+                "alter table row_log_test add ("
+                        + "simple_row2 row<a int, b string>, "
+                        + "nested_row2 row<x int, y row<z int, w string>, v 
string>, "
+                        + "array_of_rows2 array<row<a int, b string>>)");
+
+        tEnv.executeSql(
+                        "INSERT INTO row_log_test VALUES "
+                                + "(1, ROW(10, 'hello'), ROW(20, ROW(30, 
'nested'), 'row1'), ARRAY[ROW(1, 'a'), ROW(2, 'b')], 'aa',  ROW(10, 'hello'), 
ROW(20, ROW(30, 'nested'), 'row1'), ARRAY[ROW(1, 'a'), ROW(2, 'b')]),"
+                                + "(2, ROW(40, 'world'), ROW(50, ROW(60, 
'test'), 'row2'), ARRAY[ROW(3, 'c')], 'bb', ROW(40, 'world'), ROW(50, ROW(60, 
'test'), 'row2'), ARRAY[ROW(3, 'c')])")
+                .await();
+        rowIter = tEnv.executeSql("select * from row_log_test").collect();
+        expectedRows =
+                Arrays.asList(
+                        "+I[1, +I[10, hello], +I[20, +I[30, nested], row1], 
[+I[1, a], +I[2, b]], aa, null, null, null]",
+                        "+I[2, +I[40, world], +I[50, +I[60, test], row2], 
[+I[3, c]], bb, null, null, null]",
+                        "+I[1, +I[10, hello], +I[20, +I[30, nested], row1], 
[+I[1, a], +I[2, b]], aa, +I[10, hello], +I[20, +I[30, nested], row1], [+I[1, 
a], +I[2, b]]]",
+                        "+I[2, +I[40, world], +I[50, +I[60, test], row2], 
[+I[3, c]], bb, +I[40, world], +I[50, +I[60, test], row2], [+I[3, c]]]");
+        assertResultsIgnoreOrder(rowIter, expectedRows, true);
+        try (Connection conn =
+                        ConnectionFactory.createConnection(
+                                FLUSS_CLUSTER_EXTENSION.getClientConfig());
+                Table table = conn.getTable(TablePath.of(DEFAULT_DB, 
"row_log_test"))) {
+            // check field id
+            org.apache.fluss.metadata.Schema schema = 
table.getTableInfo().getSchema();
+            assertThat(schema.getColumnIds()).containsExactly(0, 1, 4, 10, 13, 
14, 17, 23);
+        }
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
index fffb0c878..ad142a6c5 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
@@ -24,7 +24,9 @@ import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeChecks;
 import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.table.api.Schema;
@@ -55,6 +57,7 @@ import static 
org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
 import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER;
 import static org.apache.fluss.flink.utils.CatalogTableTestUtils.addOptions;
 import static 
org.apache.fluss.flink.utils.CatalogTableTestUtils.checkEqualsIgnoreSchema;
+import static org.apache.fluss.types.DataTypes.FIELD;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -85,8 +88,8 @@ public class FlinkConversionsTest {
                         DataTypes.ARRAY(DataTypes.STRING()),
                         DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()),
                         DataTypes.ROW(
-                                DataTypes.FIELD("a", 
DataTypes.STRING().copy(false)),
-                                DataTypes.FIELD("b", DataTypes.INT())));
+                                FIELD("a", DataTypes.STRING().copy(false)),
+                                FIELD("b", DataTypes.INT())));
 
         // flink types
         List<org.apache.flink.table.types.DataType> flinkTypes =
@@ -133,6 +136,13 @@ public class FlinkConversionsTest {
         }
         assertThat(actualFlussTypes).isEqualTo(flussTypes);
 
+        // check the field id of rowType.
+        assertThat(
+                        DataTypeChecks.equalsWithFieldId(
+                                actualFlussTypes.get(actualFlussTypes.size() - 
1),
+                                flussTypes.get(flinkTypes.size() - 1)))
+                .isTrue();
+
         // test conversion for data types not supported in Fluss
         assertThatThrownBy(() -> FlinkConversions.toFlussType(VARCHAR(10)))
                 .isInstanceOf(UnsupportedOperationException.class)
@@ -158,6 +168,34 @@ public class FlinkConversionsTest {
                                 Column.physical(
                                         "order_id",
                                         
org.apache.flink.table.api.DataTypes.STRING().notNull()),
+                                Column.physical(
+                                        "item",
+                                        
org.apache.flink.table.api.DataTypes.ROW(
+                                                
org.apache.flink.table.api.DataTypes.FIELD(
+                                                        "item_id",
+                                                        
org.apache.flink.table.api.DataTypes
+                                                                .STRING()),
+                                                
org.apache.flink.table.api.DataTypes.FIELD(
+                                                        "item_price",
+                                                        
org.apache.flink.table.api.DataTypes
+                                                                .STRING()),
+                                                
org.apache.flink.table.api.DataTypes.FIELD(
+                                                        "item_details",
+                                                        
org.apache.flink.table.api.DataTypes.ROW(
+                                                                
org.apache.flink.table.api.DataTypes
+                                                                        .FIELD(
+                                                                               
 "category",
+                                                                               
 org.apache.flink
+                                                                               
         .table.api
+                                                                               
         .DataTypes
+                                                                               
         .STRING()),
+                                                                
org.apache.flink.table.api.DataTypes
+                                                                        .FIELD(
+                                                                               
 "specifications",
+                                                                               
 org.apache.flink
+                                                                               
         .table.api
+                                                                               
         .DataTypes
+                                                                               
         .STRING()))))),
                                 Column.physical(
                                         "orig_ts",
                                         
org.apache.flink.table.api.DataTypes.TIMESTAMP()),
@@ -181,18 +219,37 @@ public class FlinkConversionsTest {
         String expectFlussTableString =
                 "TableDescriptor{schema=("
                         + "order_id STRING NOT NULL,"
+                        + "item ROW<`item_id` STRING, `item_price` STRING, 
`item_details` ROW<`category` STRING, `specifications` STRING>>,"
                         + "orig_ts TIMESTAMP(6),"
                         + "CONSTRAINT PK_order_id PRIMARY KEY (order_id)"
                         + "), comment='test comment', partitionKeys=[], "
                         + "tableDistribution={bucketKeys=[order_id] 
bucketCount=null}, "
                         + "properties={}, "
-                        + 
"customProperties={schema.watermark.0.strategy.expr=orig_ts, "
-                        + "schema.2.expr=orig_ts, 
schema.2.data-type=TIMESTAMP(3), "
+                        + "customProperties={"
+                        + "schema.3.data-type=TIMESTAMP(3), "
+                        + "schema.watermark.0.strategy.expr=orig_ts, "
+                        + "schema.3.name=compute_ts, "
                         + "schema.watermark.0.rowtime=orig_ts, "
                         + "schema.watermark.0.strategy.data-type=TIMESTAMP(3), 
"
                         + "k1=v1, k2=v2, "
-                        + "schema.2.name=compute_ts}}";
+                        + "schema.3.expr=orig_ts}}";
         assertThat(flussTable.toString()).isEqualTo(expectFlussTableString);
+        assertThat(flussTable.getSchema().getColumnIds()).containsExactly(0, 
1, 7);
+        // check the nested row column "item"
+        org.apache.fluss.metadata.Schema.Column column = 
flussTable.getSchema().getColumns().get(1);
+        assertThat(column.getName()).isEqualTo("item");
+        assertThat(column.getDataType()).isInstanceOf(RowType.class);
+        RowType rowType = (RowType) column.getDataType();
+        assertThat(rowType.getFields())
+                .containsExactly(
+                        FIELD("item_id", DataTypes.STRING(), 2),
+                        FIELD("item_price", DataTypes.STRING(), 3),
+                        FIELD(
+                                "item_details",
+                                DataTypes.ROW(
+                                        FIELD("category", DataTypes.STRING(), 
5),
+                                        FIELD("specifications", 
DataTypes.STRING(), 6)),
+                                4));
 
         // test convert fluss table to flink table
         TablePath tablePath = TablePath.of("db", "table");
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
index 3162e4bed..eff79605a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
@@ -21,6 +21,8 @@ import org.apache.fluss.exception.SchemaChangeException;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.ReassignFieldId;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -106,12 +108,11 @@ public class SchemaUpdate {
                     "Column " + addColumn.getName() + " must be nullable.");
         }
 
+        int columnId = highestFieldId.incrementAndGet();
+        DataType dataType = ReassignFieldId.reassign(addColumn.getDataType(), 
highestFieldId);
+
         Schema.Column newColumn =
-                new Schema.Column(
-                        addColumn.getName(),
-                        addColumn.getDataType(),
-                        addColumn.getComment(),
-                        (byte) highestFieldId.incrementAndGet());
+                new Schema.Column(addColumn.getName(), dataType, 
addColumn.getComment(), columnId);
         columns.add(newColumn);
         existedColumns.put(newColumn.getName(), newColumn);
         return this;
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
index 8f12e43c4..f21d5bdd8 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
@@ -63,7 +63,9 @@ import 
org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.data.BucketAssignment;
 import org.apache.fluss.server.zk.data.TableAssignment;
+import org.apache.fluss.types.DataTypeChecks;
 import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
 import org.apache.fluss.utils.json.DataTypeJsonSerde;
 import org.apache.fluss.utils.json.JsonSerdeUtils;
 
@@ -699,7 +701,19 @@ class TableManagerITCase {
                         pbTableMetadata.getCreatedTime(),
                         pbTableMetadata.getModifiedTime());
         List<Schema.Column> columns = tableInfo.getSchema().getColumns();
-        assertThat(columns.size()).isEqualTo(3);
+        assertThat(columns.size()).isEqualTo(4);
+        assertThat(tableInfo.getSchema().getColumnIds()).containsExactly(0, 1, 
2, 5);
+        // check nested row's field_id.
+        assertThat(columns.get(2).getName()).isEqualTo("new_nested_column");
+        assertThat(
+                        DataTypeChecks.equalsWithFieldId(
+                                columns.get(2).getDataType(),
+                                new RowType(
+                                        true,
+                                        Arrays.asList(
+                                                DataTypes.FIELD("f0", 
DataTypes.STRING(), 3),
+                                                DataTypes.FIELD("f1", 
DataTypes.INT(), 4)))))
+                .isTrue();
     }
 
     private void checkBucketMetadata(int expectBucketCount, 
List<PbBucketMetadata> bucketMetadata) {
@@ -830,6 +844,17 @@ class TableManagerITCase {
 
     private static List<PbAddColumn> alterTableAddColumns() {
         List<PbAddColumn> addColumns = new ArrayList<>();
+        PbAddColumn newNestedColumn = new PbAddColumn();
+        newNestedColumn
+                .setColumnName("new_nested_column")
+                .setDataTypeJson(
+                        JsonSerdeUtils.writeValueAsBytes(
+                                DataTypes.ROW(DataTypes.STRING(), 
DataTypes.INT()),
+                                DataTypeJsonSerde.INSTANCE))
+                .setComment("new_nested_column")
+                .setColumnPositionType(0);
+        addColumns.add(newNestedColumn);
+
         PbAddColumn newColumn = new PbAddColumn();
         newColumn
                 .setColumnName("new_column")
@@ -839,6 +864,7 @@ class TableManagerITCase {
                 .setComment("new_column")
                 .setColumnPositionType(0);
         addColumns.add(newColumn);
+
         return addColumns;
     }
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
index 01cf2701a..5a28f9e2b 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java
@@ -330,7 +330,7 @@ public class TabletServiceITCase {
                 tableId,
                 0,
                 Errors.INVALID_COLUMN_PROJECTION.code(),
-                "Projected field id 2 is not contained in [0, 1]");
+                "Projected fields [2, 3] is out of bound for schema with 2 
fields.");
     }
 
     @Test

Reply via email to