This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 2185afb3d [coordinator] Refactor SchemaUpdate to delegate schema 
changes to Schema.Builder (#2416)
2185afb3d is described below

commit 2185afb3df9682064ce18db840381dc139651923
Author: Prajwal banakar <[email protected]>
AuthorDate: Fri Jan 23 17:33:02 2026 +0530

    [coordinator] Refactor SchemaUpdate to delegate schema changes to 
Schema.Builder (#2416)
    
    * Refactor SchemaUpdate to delegate schema changes to Schema.Builder
    
    * Rerunning CI to bypass flaky test
    
    * Enforce empty state requirement for Schema.Builder#fromSchema method
    
    ---------
    
    Co-authored-by: Jark Wu <[email protected]>
---
 .../java/org/apache/fluss/metadata/Schema.java     | 45 ++++++++++++-----
 .../fluss/server/coordinator/SchemaUpdate.java     | 58 +++++++---------------
 2 files changed, 49 insertions(+), 54 deletions(-)

diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
index e2960dcc7..92b675c42 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
@@ -285,18 +285,21 @@ public final class Schema implements Serializable {
 
         /** Adopts all members from the given schema. */
         public Builder fromSchema(Schema schema) {
-            columns.addAll(schema.columns);
-            if (schema.primaryKey != null) {
-                primaryKeyNamed(schema.primaryKey.constraintName, 
schema.primaryKey.columnNames);
-            }
-            if (schema.autoIncrementColumnNames != null
-                    && !schema.autoIncrementColumnNames.isEmpty()) {
-                checkState(
-                        schema.autoIncrementColumnNames.size() == 1,
-                        "Multiple auto increment columns are not supported 
yet.");
-                enableAutoIncrement(schema.autoIncrementColumnNames.get(0));
-            }
-            this.highestFieldId = new AtomicInteger(schema.highestFieldId);
+            // Check that the builder is empty before adopting from an 
existing schema
+            checkState(
+                    columns.isEmpty() && autoIncrementColumnNames.isEmpty() && 
primaryKey == null,
+                    "Schema.Builder#fromSchema should be the first API to be 
called on the builder.");
+
+            // Adopt columns while preserving their original IDs
+            this.fromColumns(schema.getColumns());
+
+            // Sync the highest field ID counter to prevent ID conflicts
+            this.highestFieldId.set(schema.getHighestFieldId());
+
+            // Copy the metadata members
+            
this.autoIncrementColumnNames.addAll(schema.getAutoIncrementColumnNames());
+            schema.getPrimaryKey().ifPresent(pk -> this.primaryKey = pk);
+
             return this;
         }
 
@@ -327,16 +330,25 @@ public final class Schema implements Serializable {
          *     some not set)
          */
         public Builder fromColumns(List<Column> inputColumns) {
+
             boolean nonSetColumnId =
                     inputColumns.stream()
                             .noneMatch(column -> column.columnId != 
Column.UNKNOWN_COLUMN_ID);
             boolean allSetColumnId =
                     inputColumns.stream()
                             .allMatch(column -> column.columnId != 
Column.UNKNOWN_COLUMN_ID);
+            // REFINED CHECK:
+            // Only throw if we are adopting a full schema (allSetColumnId)
+            // AND the builder already has columns assigned.
+            // We use !columns.isEmpty() as the primary signal of a "dirty" 
builder.
+            if (allSetColumnId && !inputColumns.isEmpty() && 
!this.columns.isEmpty()) {
+                throw new IllegalStateException(
+                        "Schema.Builder#fromColumns (or fromSchema) should be 
the first API to be called on the builder when adopting columns with IDs.");
+            }
+
             checkState(
                     nonSetColumnId || allSetColumnId,
                     "All columns must have columnId or none of them must have 
columnId.");
-
             if (allSetColumnId) {
                 columns.addAll(inputColumns);
                 List<Integer> allFieldIds = collectAllFieldIds(inputColumns);
@@ -546,6 +558,13 @@ public final class Schema implements Serializable {
             return this;
         }
 
+        /** Returns the column with the given name, if it exists. */
+        public Optional<Column> getColumn(String columnName) {
+            return columns.stream()
+                    .filter(column -> column.getName().equals(columnName))
+                    .findFirst();
+        }
+
         /** Returns an instance of an {@link Schema}. */
         public Schema build() {
             return new Schema(columns, primaryKey, highestFieldId.get(), 
autoIncrementColumnNames);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
index e3527cb88..011871a0b 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
@@ -21,15 +21,9 @@ import org.apache.fluss.exception.SchemaChangeException;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableInfo;
-import org.apache.fluss.types.DataType;
-import org.apache.fluss.types.ReassignFieldId;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /** Schema update. */
 public class SchemaUpdate {
@@ -43,36 +37,16 @@ public class SchemaUpdate {
         return schemaUpdate.getSchema();
     }
 
-    private final List<Schema.Column> columns;
-    private final AtomicInteger highestFieldId;
-    private final List<String> primaryKeys;
-    private final Map<String, Schema.Column> existedColumns;
-    private final List<String> autoIncrementColumns;
+    // Now we only maintain the Builder
+    private final Schema.Builder builder;
 
     public SchemaUpdate(TableInfo tableInfo) {
-        this.columns = new ArrayList<>();
-        this.existedColumns = new HashMap<>();
-        this.highestFieldId = new 
AtomicInteger(tableInfo.getSchema().getHighestFieldId());
-        this.primaryKeys = tableInfo.getPrimaryKeys();
-        this.autoIncrementColumns = 
tableInfo.getSchema().getAutoIncrementColumnNames();
-        this.columns.addAll(tableInfo.getSchema().getColumns());
-        for (Schema.Column column : columns) {
-            existedColumns.put(column.getName(), column);
-        }
+        // Initialize builder from the current table schema
+        this.builder = Schema.newBuilder().fromSchema(tableInfo.getSchema());
     }
 
     public Schema getSchema() {
-        Schema.Builder builder =
-                Schema.newBuilder()
-                        .fromColumns(columns)
-                        .highestFieldId((short) highestFieldId.get());
-        if (!primaryKeys.isEmpty()) {
-            builder.primaryKey(primaryKeys);
-        }
-        for (String autoIncrementColumn : autoIncrementColumns) {
-            builder.enableAutoIncrement(autoIncrementColumn);
-        }
-
+        // Validation and building are now delegated
         return builder.build();
     }
 
@@ -91,9 +65,10 @@ public class SchemaUpdate {
     }
 
     private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
-        Schema.Column existingColumn = existedColumns.get(addColumn.getName());
+        // Use the builder to check if column exists
+        Schema.Column existingColumn = 
builder.getColumn(addColumn.getName()).orElse(null);
+
         if (existingColumn != null) {
-            // Allow idempotent retries: if column name/type/comment match 
existing, treat as no-op
             if (!existingColumn.getDataType().equals(addColumn.getDataType())
                     || !Objects.equals(
                             existingColumn.getComment().orElse(null), 
addColumn.getComment())) {
@@ -103,8 +78,7 @@ public class SchemaUpdate {
             return this;
         }
 
-        TableChange.ColumnPosition position = addColumn.getPosition();
-        if (position != TableChange.ColumnPosition.last()) {
+        if (addColumn.getPosition() != TableChange.ColumnPosition.last()) {
             throw new IllegalArgumentException("Only support addColumn column 
at last now.");
         }
 
@@ -113,13 +87,15 @@ public class SchemaUpdate {
                     "Column " + addColumn.getName() + " must be nullable.");
         }
 
-        int columnId = highestFieldId.incrementAndGet();
-        DataType dataType = ReassignFieldId.reassign(addColumn.getDataType(), 
highestFieldId);
+        // Delegate the actual addition to the builder
+        builder.column(addColumn.getName(), addColumn.getDataType());
+
+        // Fixed: Use null check for the String comment
+        String comment = addColumn.getComment();
+        if (comment != null) {
+            builder.withComment(comment);
+        }
 
-        Schema.Column newColumn =
-                new Schema.Column(addColumn.getName(), dataType, 
addColumn.getComment(), columnId);
-        columns.add(newColumn);
-        existedColumns.put(newColumn.getName(), newColumn);
         return this;
     }
 

Reply via email to