wuchong commented on code in PR #2010:
URL: https://github.com/apache/fluss/pull/2010#discussion_r2569341166


##########
fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java:
##########
@@ -126,4 +213,196 @@ public String toString() {
             return "ResetOption{" + "key='" + key + '\'' + '}';
         }
     }
+
+    /** A table change to modify the table schema. */
+    interface SchemaChange extends TableChange {}
+
+    /**
+     * A table change to add a column.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; ADD &lt;column_definition&gt; 
&lt;column_position&gt;
+     * </pre>
+     */
+    class AddColumn implements SchemaChange {
+        private final String name;
+        private final DataType dataType;
+        private final @Nullable String comment;
+
+        private final ColumnPosition position;
+
+        private AddColumn(
+                String name, DataType dataType, @Nullable String comment, 
ColumnPosition position) {
+            this.name = name;
+            this.dataType = dataType;
+            this.comment = comment;
+            this.position = position;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public DataType getDataType() {
+            return dataType;
+        }
+
+        @Nullable
+        public String getComment() {
+            return comment;
+        }
+
+        public ColumnPosition getPosition() {
+            return position;
+        }
+    }
+
+    /** A table change to drop a column. */
+    class DropColumn implements SchemaChange {
+        private final String name;
+
+        private DropColumn(String name) {
+            this.name = name;
+        }
+
+        public String getName() {
+            return name;
+        }
+    }
+
+    /** A table change to modify a column. */
+    class ModifyColumn implements SchemaChange {
+        private final String name;
+        private DataType dataType;
+        private final @Nullable String comment;
+
+        private final @Nullable ColumnPosition newPosition;
+
+        private ModifyColumn(
+                String name,
+                DataType dataType,
+                @Nullable String comment,
+                @Nullable ColumnPosition newPosition) {
+            this.name = name;
+            this.dataType = dataType;
+            this.comment = comment;
+            this.newPosition = newPosition;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public DataType getDataType() {
+            return dataType;
+        }
+
+        @Nullable
+        public String getComment() {
+            return comment;
+        }
+
+        @Nullable
+        public ColumnPosition getNewPosition() {
+            return newPosition;
+        }
+    }
+
+    /** A table change to modify a column's name. */
+    class RenameColumn implements SchemaChange {
+        private final String oldColumnName;
+        private final String newColumnName;
+
+        private RenameColumn(String oldColumnName, String newColumnName) {
+            this.oldColumnName = oldColumnName;
+            this.newColumnName = newColumnName;
+        }
+
+        public String getOldColumnName() {
+            return oldColumnName;
+        }
+
+        public String getNewColumnName() {
+            return newColumnName;
+        }
+    }
+
+    /** The position of the modified or added column. */
+    interface ColumnPosition {
+        /** Get the position to place the column at the first. */
+        static ColumnPosition last() {
+            return LAST.INSTANCE;
+        }
+
+        /** Get the position to place the column at the first. */
+        static ColumnPosition first() {
+            return First.INSTANCE;
+        }
+
+        /** Get the position to place the column after the specified column. */
+        static ColumnPosition after(String column) {
+            return new After(column);
+        }
+    }
+
+    /** Column position FIRST means the specified column should be the first 
column. */
+    final class First implements ColumnPosition {
+        private static final First INSTANCE = new First();
+
+        private First() {}
+
+        @Override
+        public String toString() {
+            return "FIRST";
+        }
+    }
+
+    /** Column position LAST means the specified column should be the last 
column. */
+    final class LAST implements ColumnPosition {

Review Comment:
   Use  camel-case `First` to keep in alignment with others 



##########
fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java:
##########
@@ -126,4 +213,196 @@ public String toString() {
             return "ResetOption{" + "key='" + key + '\'' + '}';
         }
     }
+
+    /** A table change to modify the table schema. */
+    interface SchemaChange extends TableChange {}
+
+    /**
+     * A table change to add a column.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; ADD &lt;column_definition&gt; 
&lt;column_position&gt;
+     * </pre>
+     */
+    class AddColumn implements SchemaChange {
+        private final String name;
+        private final DataType dataType;
+        private final @Nullable String comment;
+
+        private final ColumnPosition position;
+
+        private AddColumn(
+                String name, DataType dataType, @Nullable String comment, 
ColumnPosition position) {
+            this.name = name;
+            this.dataType = dataType;
+            this.comment = comment;
+            this.position = position;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public DataType getDataType() {
+            return dataType;
+        }
+
+        @Nullable
+        public String getComment() {
+            return comment;
+        }
+
+        public ColumnPosition getPosition() {
+            return position;
+        }
+    }
+
+    /** A table change to drop a column. */
+    class DropColumn implements SchemaChange {
+        private final String name;
+
+        private DropColumn(String name) {
+            this.name = name;
+        }
+
+        public String getName() {
+            return name;
+        }
+    }
+
+    /** A table change to modify a column. */
+    class ModifyColumn implements SchemaChange {
+        private final String name;
+        private DataType dataType;

Review Comment:
   can be `final`



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScanner.java:
##########
@@ -113,16 +121,6 @@ public KvSnapshotBatchScanner(
         initReaderAsynchronously();
     }
 
-    /**
-     * Fetch data from snapshot.
-     *
-     * <p>If the snapshot file reader is not ready in given maximum block 
time, return an empty
-     * iterator. If the reader is ready, always return the reader if there 
remains any record in the
-     * reader, otherwise, return null.
-     *
-     * @param timeout The maximum time to block (must not be greater than 
{@link Long#MAX_VALUE}
-     *     milliseconds)
-     */

Review Comment:
   why removes the javadoc?



##########
fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java:
##########
@@ -165,6 +166,8 @@ default boolean hasWriterId() {
     /** The read context of a {@link LogRecordBatch} to read records. */
     interface ReadContext {
 
+        boolean isSchemaChange(int schemaId);

Review Comment:
   `isSchemaChanged` and add javadocs



##########
fluss-common/src/main/java/org/apache/fluss/row/PruneRow.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row;
+
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.RowType;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+import static org.apache.fluss.utils.SchemaUtil.getIndexMapping;
+
+/**
+ * A specific implementation of {@link ProjectedRow} which prunes the columns 
from the origin schema
+ * to the target schema.
+ */
+public class PruneRow extends ProjectedRow {

Review Comment:
   remove this class and move the `isNullAt` logic into `ProjectedRow`. 



##########
fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java:
##########
@@ -55,7 +73,7 @@ public static class Value {
         public final short schemaId;
         public final BinaryRow row;
 
-        private Value(short schemaId, BinaryRow row) {
+        public Value(short schemaId, BinaryRow row) {

Review Comment:
   only used in `ValueDecoder`, keep `private`.



##########
fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java:
##########
@@ -193,17 +218,28 @@ public boolean isProjectionPushDowned() {
 
     @Override
     public VectorSchemaRoot getVectorSchemaRoot(int schemaId) {
-        checkArgument(
-                schemaId == this.schemaId,
-                "The schemaId (%s) in the record batch is not the same as the 
context (%s).",
-                schemaId,
-                this.schemaId);
+        //        checkArgument(
+        //                schemaId == this.schemaId,
+        //                "The schemaId (%s) in the record batch is not the 
same as the context
+        // (%s).",
+        //                schemaId,
+        //                this.schemaId);

Review Comment:
   remove this



##########
fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java:
##########
@@ -217,18 +218,49 @@ public CloseableIterator<LogRecord> records(ReadContext 
context) {
         long timestamp = commitTimestamp();
         LogFormat logFormat = context.getLogFormat();
         RowType rowType = context.getRowType(schemaId);
+
+        CloseableIterator<LogRecord> records;
         switch (logFormat) {
             case ARROW:
-                return columnRecordIterator(
-                        rowType,
-                        context.getVectorSchemaRoot(schemaId),
-                        context.getBufferAllocator(),
-                        timestamp);
+                records =
+                        columnRecordIterator(
+                                rowType,
+                                context.getVectorSchemaRoot(schemaId),
+                                context.getBufferAllocator(),
+                                timestamp);
+                break;
             case INDEXED:
-                return rowRecordIterator(rowType, timestamp);
+                records = rowRecordIterator(rowType, timestamp);
+                break;
             default:
                 throw new IllegalArgumentException("Unsupported log format: " 
+ logFormat);
         }
+
+        if (!context.isSchemaChange(schemaId)) {
+            return records;
+        }
+        PruneRow pruneRow = context.getPruneRow(schemaId);
+        return new CloseableIterator<>() {
+            @Override
+            public void close() {
+                records.close();
+            }
+
+            @Override
+            public boolean hasNext() {
+                return records.hasNext();
+            }
+
+            @Override
+            public LogRecord next() {
+                LogRecord next = records.next();
+                return new GenericRecord(
+                        next.logOffset(),
+                        next.timestamp(),
+                        next.getChangeType(),
+                        pruneRow.replaceRow(next.getRow()));
+            }
+        };

Review Comment:
   We can add two new methods in 
`org.apache.fluss.record.LogRecordBatch.ReadContext`, and then remove 
`isSchemaChange`, and `getPruneRow`. 
   
   ```java
           /**
            * If the read context defines an output projection (for example, 
log records may add new
            * columns or reorder columns, but reader need a static schema for 
the output rows), return
            * a {@link ProjectedRow} that describes the projected output row 
for the given schemaId.
            * The returned object is used by readers to transform or 
materialize rows according to the
            * output projection. Returns {@code null} if no output projection 
is configured.
            *
            * @param schemaId the current row schema id
            * @return a {@link ProjectedRow} describing the output projection, 
or {@code null} if none
            */
           @Nullable
           ProjectedRow getOutputProjectedRow(int schemaId);
   ```
   
   And here we can pass the nullable output ProjectedRow into 
`columnRecordIterator` and `rowRecordIterator` for final projection. And we 
don't need to wrap an iterator again here. 



##########
fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java:
##########
@@ -335,6 +335,23 @@ public static TableInfo of(
                 modifiedTime);
     }
 
+    /** Replace a TableInfo with a new SchemaInfo. */
+    public static TableInfo of(TableInfo tableInfo, SchemaInfo schemaInfo) {

Review Comment:
   Provide a `public TableInfo withNewSchema(SchemaInfo schemaInfo)` on the 
`TableInfo` class would be more convenient? 



##########
fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java:
##########
@@ -104,6 +105,14 @@ public Table getTable(TablePath tablePath) {
         return new FlussTable(this, tablePath, tableInfo);
     }
 
+    @Override
+    public Table getTable(TablePath tablePath, SchemaInfo schemaInfo) {

Review Comment:
   As we discussed, we don't need this now, and just using `souceOutputType` in 
`FlinkSource` is enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to