This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-add-column
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit e715276f23a517c8640a267c85d0aba0e040c874
Author: Jark Wu <[email protected]>
AuthorDate: Sun Nov 30 21:14:28 2025 +0800

    WIP
---
 .../org/apache/fluss/client/table/FlussTable.java  |  6 +---
 .../client/table/scanner/batch/BatchScanUtils.java |  3 +-
 .../table/scanner/batch/LimitBatchScanner.java     | 22 ++++++++++---
 .../table/scanner/batch/SnapshotFilesReader.java   | 19 +++++++++--
 .../client/table/scanner/log/BucketScanStatus.java |  3 +-
 .../fluss/client/table/scanner/log/LogFetcher.java |  3 +-
 .../client/table/writer/AbstractTableWriter.java   |  2 --
 .../fluss/client/write/RecordAccumulator.java      |  3 +-
 .../java/org/apache/fluss/client/write/Sender.java |  6 ++--
 .../fluss/client/admin/FlussAdminITCase.java       | 37 ++++++++++++++++++++--
 .../batch/KvSnapshotBatchScannerITCase.java        |  4 +--
 .../table/scanner/log/LogFetchCollectorTest.java   |  3 +-
 .../client/table/scanner/log/LogFetcherTest.java   |  1 +
 .../fluss/client/write/RecordAccumulatorTest.java  |  1 -
 .../org/apache/fluss/exception/ApiException.java   | 16 +++++++++-
 .../fluss/exception/SchemaChangeException.java     |  4 +++
 .../java/org/apache/fluss/metadata/Schema.java     |  1 +
 .../org/apache/fluss/metadata/TableChange.java     |  2 +-
 .../java/org/apache/fluss/metadata/TableInfo.java  |  5 +++
 .../java/org/apache/fluss/utils/Projection.java    |  6 +---
 .../java/org/apache/fluss/utils/SchemaUtil.java    | 21 ++++++++----
 .../server/metadata/SchemaMetadataManager.java     |  1 +
 22 files changed, 122 insertions(+), 47 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java 
b/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java
index 629abe6ab..8532f2a85 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java
@@ -29,7 +29,6 @@ import org.apache.fluss.client.table.writer.TableAppend;
 import org.apache.fluss.client.table.writer.TableUpsert;
 import org.apache.fluss.client.table.writer.Upsert;
 import org.apache.fluss.metadata.SchemaGetter;
-import org.apache.fluss.metadata.SchemaInfo;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 
@@ -55,10 +54,7 @@ public class FlussTable implements Table {
         this.tableInfo = tableInfo;
         this.hasPrimaryKey = tableInfo.hasPrimaryKey();
         this.schemaGetter =
-                new ClientSchemaGetter(
-                        tableInfo.getTablePath(),
-                        new SchemaInfo(tableInfo.getSchema(), 
tableInfo.getSchemaId()),
-                        conn.getAdmin());
+                new ClientSchemaGetter(tablePath, tableInfo.getSchemaInfo(), 
conn.getAdmin());
     }
 
     @Override
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java
index f4e150f30..d36bf9a7d 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java
@@ -71,8 +71,7 @@ public class BatchScanUtils {
                     // If the scanner has more data, add it back to the queue
                     scannerQueue.add(scanner);
                 } else {
-                    // Close the scanner if it has no more data, and not add 
it back to the
-                    // queue
+                    // Close the scanner if it has no more data, and not add 
it back to the queue
                     scanner.close();
                 }
             } catch (Exception e) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java
index e6208f368..06c79fce8 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java
@@ -39,6 +39,7 @@ import org.apache.fluss.rpc.messages.LimitScanRequest;
 import org.apache.fluss.rpc.messages.LimitScanResponse;
 import org.apache.fluss.types.RowType;
 import org.apache.fluss.utils.CloseableIterator;
+import org.apache.fluss.utils.SchemaUtil;
 
 import javax.annotation.Nullable;
 
@@ -47,7 +48,9 @@ import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -64,6 +67,12 @@ public class LimitBatchScanner implements BatchScanner {
     private final KvFormat kvFormat;
     private final int targetSchemaId;
 
+    /**
+     * A cache for schema projection mapping from source schema to target. Use 
HashMap here, because
+     * LimitBatchScanner is used in single thread only.
+     */
+    private final Map<Short, int[]> schemaProjectionCache = new HashMap<>();
+
     private boolean endOfInput;
 
     public LimitBatchScanner(
@@ -142,11 +151,14 @@ public class LimitBatchScanner implements BatchScanner {
             for (ValueRecord record : valueRecords.records(readContext)) {
                 InternalRow row = record.getRow();
                 if (targetSchemaId != record.schemaId()) {
-                    row =
-                            ProjectedRow.from(
-                                            
schemaGetter.getSchema(record.schemaId()),
-                                            
schemaGetter.getSchema(targetSchemaId))
-                                    .replaceRow(row);
+                    int[] indexMapping =
+                            schemaProjectionCache.computeIfAbsent(
+                                    record.schemaId(),
+                                    sourceSchemaId ->
+                                            SchemaUtil.getIndexMapping(
+                                                    
schemaGetter.getSchema(sourceSchemaId),
+                                                    
schemaGetter.getSchema(targetSchemaId)));
+                    row = ProjectedRow.from(indexMapping).replaceRow(row);
                 }
                 scanRows.add(maybeProject(row));
             }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java
index 10e773609..a7c5e2969 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java
@@ -30,6 +30,7 @@ import org.apache.fluss.row.encode.ValueDecoder;
 import org.apache.fluss.utils.CloseableIterator;
 import org.apache.fluss.utils.CloseableRegistry;
 import org.apache.fluss.utils.IOUtils;
+import org.apache.fluss.utils.SchemaUtil;
 
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
@@ -43,6 +44,8 @@ import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * A reader to read kv snapshot files to {@link ScanRecord}s. It will return 
the {@link ScanRecord}s
@@ -58,6 +61,12 @@ class SnapshotFilesReader implements 
CloseableIterator<InternalRow> {
     @Nullable private final int[] projectedFields;
     private RocksIteratorWrapper rocksIteratorWrapper;
 
+    /**
+     * A cache for schema projection mapping from source schema to target. Use 
HashMap here, because
+     * SnapshotFilesReader is used in single thread only.
+     */
+    private final Map<Short, int[]> schemaProjectionCache = new HashMap<>();
+
     private Snapshot snapshot;
     private RocksDBHandle rocksDBHandle;
     private boolean isClose = false;
@@ -158,9 +167,13 @@ class SnapshotFilesReader implements 
CloseableIterator<InternalRow> {
         ValueDecoder.Value originValue = valueDecoder.decodeValue(value);
         InternalRow originRow = originValue.row;
         if (targetSchemaId != originValue.schemaId) {
-            originRow =
-                    
ProjectedRow.from(schemaGetter.getSchema(originValue.schemaId), targetSchema)
-                            .replaceRow(originRow);
+            int[] indexMapping =
+                    schemaProjectionCache.computeIfAbsent(
+                            originValue.schemaId,
+                            sourceSchemaId ->
+                                    SchemaUtil.getIndexMapping(
+                                            
schemaGetter.getSchema(sourceSchemaId), targetSchema));
+            originRow = ProjectedRow.from(indexMapping).replaceRow(originRow);
         }
 
         if (projectedFields != null) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/BucketScanStatus.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/BucketScanStatus.java
index bee0910e1..738e04a49 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/BucketScanStatus.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/BucketScanStatus.java
@@ -24,8 +24,7 @@ import org.apache.fluss.annotation.Internal;
 class BucketScanStatus {
     private long offset; // last consumed position
     private long highWatermark; // the high watermark from last fetch
-    // TODO add
-    //  resetStrategy and nextAllowedRetryTimeMs.
+    // TODO add resetStrategy and nextAllowedRetryTimeMs.
 
     public BucketScanStatus() {
         this.offset = 0L;
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
index e8067dd4c..b19347772 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
@@ -364,8 +364,7 @@ public class LogFetcher implements Closeable {
                             if (!MemoryLogRecords.EMPTY.equals(logRecords)
                                     || fetchResultForBucket.getErrorCode() != 
Errors.NONE.code()) {
                                 // In oder to not signal notEmptyCondition, 
add completed
-                                // fetch to
-                                // buffer until log records is not empty.
+                                // fetch to buffer until log records is not 
empty.
                                 DefaultCompletedFetch completedFetch =
                                         new DefaultCompletedFetch(
                                                 tb,
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java
index f1811dfa2..98a93fdd2 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java
@@ -37,13 +37,11 @@ public abstract class AbstractTableWriter implements 
TableWriter {
     protected final TablePath tablePath;
     protected final WriterClient writerClient;
     protected final int fieldCount;
-    protected TableInfo tableInfo;
     private final @Nullable PartitionGetter partitionFieldGetter;
 
     protected AbstractTableWriter(
             TablePath tablePath, TableInfo tableInfo, WriterClient 
writerClient) {
         this.tablePath = tablePath;
-        this.tableInfo = tableInfo;
         this.writerClient = writerClient;
         this.fieldCount = tableInfo.getRowType().getFieldCount();
         this.partitionFieldGetter =
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
index f7d7d3ef0..3a282ba92 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
@@ -868,8 +868,7 @@ public final class RecordAccumulator {
             // Either we have reached a point where there are batches without 
a sequence (i.e. never
             // been drained and are hence in order by default), or the batch 
at the front of the
             // queue has a sequence greater than the incoming batch. This is 
the right place to
-            // add
-            // the incoming batch.
+            // add the incoming batch.
             deque.addFirst(batch);
 
             // Now we have to re-insert the previously queued batches in the 
right order.
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
index 1b9c79397..e9c285306 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
@@ -130,7 +130,7 @@ public class Sender implements Runnable {
         this.idempotenceManager = idempotenceManager;
         this.writerMetricGroup = writerMetricGroup;
 
-        // TODOadd retry logic while send failed. See FLUSS-56364375
+        // TODO add retry logic while send failed. See FLUSS-56364375
     }
 
     @VisibleForTesting
@@ -167,7 +167,7 @@ public class Sender implements Runnable {
             }
         }
 
-        // TODO if force close failed,add logic to abort incomplete batches.
+        // TODO if force close failed, add logic to abort incomplete batches.
         LOG.debug("Shutdown of Fluss write sender I/O thread has completed.");
     }
 
@@ -245,7 +245,7 @@ public class Sender implements Runnable {
         if (!batches.isEmpty()) {
             addToInflightBatches(batches);
 
-            // TODOadd logic for batch expire.
+            // TODO add logic for batch expire.
 
             sendWriteRequests(batches);
 
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 05f196fdd..bd7954284 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -354,13 +354,43 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                                         .get())
                 .hasMessageContaining("Column c1 must be nullable");
 
+        assertThatThrownBy(
+                        () ->
+                                admin.alterTable(
+                                                tablePath,
+                                                Collections.singletonList(
+                                                        TableChange.addColumn(
+                                                                "c1",
+                                                                
DataTypes.STRING().copy(false),
+                                                                null,
+                                                                
TableChange.ColumnPosition
+                                                                        
.first())),
+                                                false)
+                                        .get())
+                .hasMessageContaining("Unsupported ColumnPositionType: FIRST");
+
+        assertThatThrownBy(
+                        () ->
+                                admin.alterTable(
+                                                tablePath,
+                                                Collections.singletonList(
+                                                        TableChange.addColumn(
+                                                                "c1",
+                                                                
DataTypes.STRING().copy(false),
+                                                                null,
+                                                                
TableChange.ColumnPosition.after(
+                                                                        
"name"))),
+                                                false)
+                                        .get())
+                .hasMessageContaining("Unsupported ColumnPositionType: 
AFTER(name)");
+
         admin.alterTable(
                         tablePath,
                         Collections.singletonList(
                                 TableChange.addColumn(
                                         "c1",
                                         DataTypes.STRING(),
-                                        null,
+                                        "new column c1",
                                         TableChange.ColumnPosition.last())),
                         false)
                 .get();
@@ -380,7 +410,10 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                                         new Schema.Column(
                                                 "age", DataTypes.INT(), 
"person age", (short) 2),
                                         new Schema.Column(
-                                                "c1", DataTypes.STRING(), 
null, (short) 3)))
+                                                "c1",
+                                                DataTypes.STRING(),
+                                                "new column c1",
+                                                (short) 3)))
                         .build();
         SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get();
         assertThat(schemaInfo).isEqualTo(new SchemaInfo(expectedSchema, 2));
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
index 527994777..62b5bdc13 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
@@ -134,7 +134,7 @@ class KvSnapshotBatchScannerITCase extends 
ClientToServerITCaseBase {
                         Collections.singletonList(
                                 TableChange.addColumn(
                                         "new_column",
-                                        
DataTypes.BIGINT().copy(false).copy(true),
+                                        DataTypes.BIGINT().copy(true),
                                         null,
                                         TableChange.ColumnPosition.last())),
                         false)
@@ -148,7 +148,7 @@ class KvSnapshotBatchScannerITCase extends 
ClientToServerITCaseBase {
                         .column("name", DataTypes.STRING())
                         .column("new_column", DataTypes.BIGINT())
                         .build();
-        // put into values with old schema.
+        // put into values with new schema.
         List<InternalRow> rows = new ArrayList<>();
         for (int i = 10; i < 20; i++) {
             InternalRow row =
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
index 83878a816..99a108e2f 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java
@@ -93,8 +93,7 @@ public class LogFetchCollectorTest {
         logFetchBuffer.add(completedFetch);
         assertThat(logFetchBuffer.isEmpty()).isFalse();
 
-        // Validate that the completed fetch isn't initialized just because we 
add it to the
-        // buffer.
+        // Validate that the completed fetch isn't initialized just because we 
add it to the buffer
         assertThat(completedFetch.isInitialized()).isFalse();
 
         // Fetch the data and validate that we get all the records we want 
back.
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
index 03f908733..e5a8c8ff0 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
@@ -199,6 +199,7 @@ public class LogFetcherTest extends 
ClientToServerITCaseBase {
         scanRecords = records.get(tb0);
         
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
                 .isEqualTo(expectedRows);
+        newSchemaLogFetcher.close();
     }
 
     @Test
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index bc4e53b50..47a9c2414 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -137,7 +137,6 @@ class RecordAccumulatorTest {
         conf = new Configuration();
         // init cluster.
         cluster = updateCluster(Arrays.asList(bucket1, bucket2, bucket3));
-        // todo: 从cluster中获取schema getter
         schemaGetter = new TestingSchemaGetter(new SchemaInfo(DATA1_SCHEMA, 
(short) 1));
     }
 
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/exception/ApiException.java 
b/fluss-common/src/main/java/org/apache/fluss/exception/ApiException.java
index 14b0b7f96..b577b8633 100644
--- a/fluss-common/src/main/java/org/apache/fluss/exception/ApiException.java
+++ b/fluss-common/src/main/java/org/apache/fluss/exception/ApiException.java
@@ -25,21 +25,35 @@ public class ApiException extends FlussRuntimeException {
 
     private static final long serialVersionUID = 1L;
 
+    private final boolean stackTraceEnabled;
+
+    public ApiException(String message, Throwable cause, boolean 
stackTraceEnabled) {
+        super(message, cause);
+        this.stackTraceEnabled = stackTraceEnabled;
+    }
+
     public ApiException(String message, Throwable cause) {
         super(message, cause);
+        this.stackTraceEnabled = false;
     }
 
     public ApiException(String message) {
         super(message);
+        this.stackTraceEnabled = false;
     }
 
     public ApiException(Throwable cause) {
         super(cause);
+        this.stackTraceEnabled = false;
     }
 
     /* avoid the expensive and useless stack trace for api exceptions */
     @Override
     public Throwable fillInStackTrace() {
-        return this;
+        if (stackTraceEnabled) {
+            return super.fillInStackTrace();
+        } else {
+            return this;
+        }
     }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/exception/SchemaChangeException.java
 
b/fluss-common/src/main/java/org/apache/fluss/exception/SchemaChangeException.java
index 8c9446682..77983e441 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/exception/SchemaChangeException.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/exception/SchemaChangeException.java
@@ -27,4 +27,8 @@ public class SchemaChangeException extends ApiException {
     public SchemaChangeException(String message) {
         this(message, null);
     }
+
+    public SchemaChangeException(String message, boolean stackTraceEnabled) {
+        super(message, null, stackTraceEnabled);
+    }
 }
diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
index 367013363..c065a3711 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
@@ -132,6 +132,7 @@ public final class Schema implements Serializable {
         return 
columns.stream().map(Column::getName).collect(Collectors.toList());
     }
 
+    /** Returns all column ids for top-level columns, the nested field ids are 
not included. */
     public List<Integer> getColumnIds() {
         return 
columns.stream().map(Column::getColumnId).collect(Collectors.toList());
     }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
index 36162f30c..e012a38f6 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
@@ -402,7 +402,7 @@ public interface TableChange {
 
         @Override
         public String toString() {
-            return String.format("AFTER %s", columnName);
+            return String.format("AFTER(%s)", columnName);
         }
     }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java
index af293f120..eba41e078 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java
@@ -137,6 +137,11 @@ public final class TableInfo {
         return schema;
     }
 
+    /** Returns the schema info of the table, including schema and schema id. 
*/
+    public SchemaInfo getSchemaInfo() {
+        return new SchemaInfo(schema, schemaId);
+    }
+
     /**
      * Returns the row type of the table. The row type is the schema of the 
table, which defines the
      * columns and types of the table.
diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java
index c597caf81..7c392b055 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/Projection.java
@@ -112,11 +112,7 @@ public class Projection {
         return projectionPositions;
     }
 
-    /**
-     * The id of the projection, which is used for fluss server scan.
-     *
-     * @return
-     */
+    /** The id of the projection, which is used for fluss server scan. */
     public int[] getProjectionIdInOrder() {
         return projectionIdsInOrder;
     }
diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/SchemaUtil.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/SchemaUtil.java
index 6b66c0cd6..1815e71cd 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/SchemaUtil.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/SchemaUtil.java
@@ -39,6 +39,7 @@ public class SchemaUtil {
      * @param expectedSchema
      * @return
      */
+    // TODO
     public static int[] getTargetColumns(
             int[] targetColumns, Schema originSchema, Schema expectedSchema) {
         if (targetColumns == null) {
@@ -57,12 +58,17 @@ public class SchemaUtil {
     }
 
     /**
-     * Get the index of each value of expectedSchema from originSchema.
+     * Get the index mapping from origin schema to expected schema. This 
mapping can be used to
+     * convert the row with origin schema to the row with expected schema via 
{@code
+     * ProjectedRow.from(getIndexMapping(inputSchema, 
targetSchema)).replace(inputRow)}.
      *
-     * @param originSchema
-     * @param expectedSchema
-     * @return indexMapping the value of i-th means the expectedSchema's i-th 
column is the
-     *     originSchema's indexMapping[i]-th column.
+     * @param originSchema The origin schema.
+     * @param expectedSchema The expected schema.
+     * @return The index mapping array. The length of the array is the number 
of columns in the
+     *     expected schema. Each element in the array is the index of the 
corresponding column in
+     *     the origin schema. If a column in the expected schema does not 
exist in the origin
+     *     schema, the corresponding element is UNEXIST_MAPPING (-1).
+     * @throws SchemaChangeException if there is a datatype mismatch between 
the two schemas.
      */
     public static int[] getIndexMapping(Schema originSchema, Schema 
expectedSchema) {
         List<Schema.Column> originColumns = originSchema.getColumns();
@@ -84,7 +90,7 @@ public class SchemaUtil {
             if (originColumn != null
                     && !Objects.equals(
                             expectedColumn.getDataType().copy(true),
-                            
originColumns.get(indexMapping[i]).getDataType().copy(true))) {
+                            originColumn.getDataType().copy(true))) {
 
                 throw new SchemaChangeException(
                         String.format(
@@ -92,7 +98,8 @@ public class SchemaUtil {
                                 expectedColumn.getColumnId(),
                                 expectedColumn.getName(),
                                 expectedColumn.getDataType(),
-                                originColumn.getDataType()));
+                                originColumn.getDataType()),
+                        true);
             }
         }
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/SchemaMetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/SchemaMetadataManager.java
index 8b6be5e39..6bed8dfe3 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/SchemaMetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/SchemaMetadataManager.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException;
  * 1. latest schema of each subscribed table, updated by UpdateMetadata 
request. 2. history schemas
  * of each table, updated by lookup from tablet server.
  */
+// TODO SchemaManager
 public class SchemaMetadataManager {
 
     private MetadataManager metadataManager;

Reply via email to