This is an automated email from the ASF dual-hosted git repository.
hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 2185afb3d [coordinator] Refactor SchemaUpdate to delegate schema
changes to Schema.Builder (#2416)
2185afb3d is described below
commit 2185afb3df9682064ce18db840381dc139651923
Author: Prajwal banakar <[email protected]>
AuthorDate: Fri Jan 23 17:33:02 2026 +0530
[coordinator] Refactor SchemaUpdate to delegate schema changes to
Schema.Builder (#2416)
* Refactor SchemaUpdate to delegate schema changes to Schema.Builder
* Rerunning CI to bypass flaky test
* Enforce empty state requirement for Schema.Builder#fromSchema method
---------
Co-authored-by: Jark Wu <[email protected]>
---
.../java/org/apache/fluss/metadata/Schema.java | 45 ++++++++++++-----
.../fluss/server/coordinator/SchemaUpdate.java | 58 +++++++---------------
2 files changed, 49 insertions(+), 54 deletions(-)
diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
index e2960dcc7..92b675c42 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
@@ -285,18 +285,21 @@ public final class Schema implements Serializable {
/** Adopts all members from the given schema. */
public Builder fromSchema(Schema schema) {
- columns.addAll(schema.columns);
- if (schema.primaryKey != null) {
- primaryKeyNamed(schema.primaryKey.constraintName,
schema.primaryKey.columnNames);
- }
- if (schema.autoIncrementColumnNames != null
- && !schema.autoIncrementColumnNames.isEmpty()) {
- checkState(
- schema.autoIncrementColumnNames.size() == 1,
- "Multiple auto increment columns are not supported
yet.");
- enableAutoIncrement(schema.autoIncrementColumnNames.get(0));
- }
- this.highestFieldId = new AtomicInteger(schema.highestFieldId);
+ // Check that the builder is empty before adopting from an
existing schema
+ checkState(
+ columns.isEmpty() && autoIncrementColumnNames.isEmpty() &&
primaryKey == null,
+ "Schema.Builder#fromSchema should be the first API to be
called on the builder.");
+
+ // Adopt columns while preserving their original IDs
+ this.fromColumns(schema.getColumns());
+
+ // Sync the highest field ID counter to prevent ID conflicts
+ this.highestFieldId.set(schema.getHighestFieldId());
+
+ // Copy the metadata members
+
this.autoIncrementColumnNames.addAll(schema.getAutoIncrementColumnNames());
+ schema.getPrimaryKey().ifPresent(pk -> this.primaryKey = pk);
+
return this;
}
@@ -327,16 +330,25 @@ public final class Schema implements Serializable {
* some not set)
*/
public Builder fromColumns(List<Column> inputColumns) {
+
boolean nonSetColumnId =
inputColumns.stream()
.noneMatch(column -> column.columnId !=
Column.UNKNOWN_COLUMN_ID);
boolean allSetColumnId =
inputColumns.stream()
.allMatch(column -> column.columnId !=
Column.UNKNOWN_COLUMN_ID);
+ // REFINED CHECK:
+ // Only throw if we are adopting a full schema (allSetColumnId)
+ // AND the builder already has columns assigned.
+ // We use !columns.isEmpty() as the primary signal of a "dirty"
builder.
+ if (allSetColumnId && !inputColumns.isEmpty() &&
!this.columns.isEmpty()) {
+ throw new IllegalStateException(
+ "Schema.Builder#fromColumns (or fromSchema) should be
the first API to be called on the builder when adopting columns with IDs.");
+ }
+
checkState(
nonSetColumnId || allSetColumnId,
"All columns must have columnId or none of them must have
columnId.");
-
if (allSetColumnId) {
columns.addAll(inputColumns);
List<Integer> allFieldIds = collectAllFieldIds(inputColumns);
@@ -546,6 +558,13 @@ public final class Schema implements Serializable {
return this;
}
+ /** Returns the column with the given name, if it exists. */
+ public Optional<Column> getColumn(String columnName) {
+ return columns.stream()
+ .filter(column -> column.getName().equals(columnName))
+ .findFirst();
+ }
+
/** Returns an instance of an {@link Schema}. */
public Schema build() {
return new Schema(columns, primaryKey, highestFieldId.get(),
autoIncrementColumnNames);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
index e3527cb88..011871a0b 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
@@ -21,15 +21,9 @@ import org.apache.fluss.exception.SchemaChangeException;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableInfo;
-import org.apache.fluss.types.DataType;
-import org.apache.fluss.types.ReassignFieldId;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
/** Schema update. */
public class SchemaUpdate {
@@ -43,36 +37,16 @@ public class SchemaUpdate {
return schemaUpdate.getSchema();
}
- private final List<Schema.Column> columns;
- private final AtomicInteger highestFieldId;
- private final List<String> primaryKeys;
- private final Map<String, Schema.Column> existedColumns;
- private final List<String> autoIncrementColumns;
+ // Now we only maintain the Builder
+ private final Schema.Builder builder;
public SchemaUpdate(TableInfo tableInfo) {
- this.columns = new ArrayList<>();
- this.existedColumns = new HashMap<>();
- this.highestFieldId = new
AtomicInteger(tableInfo.getSchema().getHighestFieldId());
- this.primaryKeys = tableInfo.getPrimaryKeys();
- this.autoIncrementColumns =
tableInfo.getSchema().getAutoIncrementColumnNames();
- this.columns.addAll(tableInfo.getSchema().getColumns());
- for (Schema.Column column : columns) {
- existedColumns.put(column.getName(), column);
- }
+ // Initialize builder from the current table schema
+ this.builder = Schema.newBuilder().fromSchema(tableInfo.getSchema());
}
public Schema getSchema() {
- Schema.Builder builder =
- Schema.newBuilder()
- .fromColumns(columns)
- .highestFieldId((short) highestFieldId.get());
- if (!primaryKeys.isEmpty()) {
- builder.primaryKey(primaryKeys);
- }
- for (String autoIncrementColumn : autoIncrementColumns) {
- builder.enableAutoIncrement(autoIncrementColumn);
- }
-
+ // Validation and building are now delegated
return builder.build();
}
@@ -91,9 +65,10 @@ public class SchemaUpdate {
}
private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
- Schema.Column existingColumn = existedColumns.get(addColumn.getName());
+ // Use the builder to check if column exists
+ Schema.Column existingColumn =
builder.getColumn(addColumn.getName()).orElse(null);
+
if (existingColumn != null) {
- // Allow idempotent retries: if column name/type/comment match
existing, treat as no-op
if (!existingColumn.getDataType().equals(addColumn.getDataType())
|| !Objects.equals(
existingColumn.getComment().orElse(null),
addColumn.getComment())) {
@@ -103,8 +78,7 @@ public class SchemaUpdate {
return this;
}
- TableChange.ColumnPosition position = addColumn.getPosition();
- if (position != TableChange.ColumnPosition.last()) {
+ if (addColumn.getPosition() != TableChange.ColumnPosition.last()) {
throw new IllegalArgumentException("Only support addColumn column
at last now.");
}
@@ -113,13 +87,15 @@ public class SchemaUpdate {
"Column " + addColumn.getName() + " must be nullable.");
}
- int columnId = highestFieldId.incrementAndGet();
- DataType dataType = ReassignFieldId.reassign(addColumn.getDataType(),
highestFieldId);
+ // Delegate the actual addition to the builder
+ builder.column(addColumn.getName(), addColumn.getDataType());
+
+ // Fixed: Use null check for the String comment
+ String comment = addColumn.getComment();
+ if (comment != null) {
+ builder.withComment(comment);
+ }
- Schema.Column newColumn =
- new Schema.Column(addColumn.getName(), dataType,
addColumn.getComment(), columnId);
- columns.add(newColumn);
- existedColumns.put(newColumn.getName(), newColumn);
return this;
}