This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 4a627623c [lake] Support ADD COLUMN AT LAST for dataLake enabled
tables (#2189)
4a627623c is described below
commit 4a627623c9f9d056aa5e4a3991ca1b2b503453ff
Author: Knock.Code <[email protected]>
AuthorDate: Sun Dec 28 15:05:06 2025 +0800
[lake] Support ADD COLUMN AT LAST for dataLake enabled tables (#2189)
---
.../org/apache/fluss/metadata/TableChange.java | 49 ++++++++
.../fluss/lake/paimon/PaimonLakeCatalog.java | 93 ++++++++++++++-
.../paimon/tiering/FlussRecordAsPaimonRow.java | 60 ++++++++--
.../fluss/lake/paimon/utils/PaimonConversions.java | 25 ++++
.../lake/paimon/LakeEnabledTableCreateITCase.java | 26 ++++-
.../fluss/lake/paimon/PaimonLakeCatalogTest.java | 130 +++++++++++++++++++++
.../paimon/tiering/FlussRecordAsPaimonRowTest.java | 61 ++++++++++
.../lake/paimon/tiering/PaimonTieringITCase.java | 74 +++++++++++-
.../server/coordinator/CoordinatorService.java | 15 ++-
.../fluss/server/coordinator/MetadataManager.java | 57 +++++++--
.../fluss/server/coordinator/SchemaUpdate.java | 14 ++-
.../coordinator/CoordinatorEventProcessorTest.java | 2 +-
.../event/watcher/TableChangeWatcherTest.java | 4 +-
13 files changed, 566 insertions(+), 44 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
index fcc8462d9..bb027d690 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
@@ -257,6 +257,22 @@ public interface TableChange {
public ColumnPosition getPosition() {
return position;
}
+
+ @Override
+ public String toString() {
+ return "AddColumn{"
+ + "name='"
+ + name
+ + '\''
+ + ", dataType="
+ + dataType
+ + ", comment='"
+ + comment
+ + '\''
+ + ", position="
+ + position
+ + '}';
+ }
}
/** A table change to drop a column. */
@@ -270,6 +286,11 @@ public interface TableChange {
public String getName() {
return name;
}
+
+ @Override
+ public String toString() {
+ return "DropColumn{" + "name='" + name + '\'' + '}';
+ }
}
/** A table change to modify a column. */
@@ -308,6 +329,22 @@ public interface TableChange {
public ColumnPosition getNewPosition() {
return newPosition;
}
+
+ @Override
+ public String toString() {
+ return "ModifyColumn{"
+ + "name='"
+ + name
+ + '\''
+ + ", dataType="
+ + dataType
+ + ", comment='"
+ + comment
+ + '\''
+ + ", newPosition="
+ + newPosition
+ + '}';
+ }
}
/** A table change to modify a column's name. */
@@ -327,6 +364,18 @@ public interface TableChange {
public String getNewColumnName() {
return newColumnName;
}
+
+ @Override
+ public String toString() {
+ return "RenameColumn{"
+ + "oldColumnName='"
+ + oldColumnName
+ + '\''
+ + ", newColumnName='"
+ + newColumnName
+ + '\''
+ + '}';
+ }
}
/** The position of the modified or added column. */
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index 42bc01942..f8cb46ce4 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -19,6 +19,7 @@ package org.apache.fluss.lake.paimon;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
@@ -38,6 +39,8 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.LinkedHashMap;
import java.util.List;
@@ -54,6 +57,7 @@ import static
org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
/** A Paimon implementation of {@link LakeCatalog}. */
public class PaimonLakeCatalog implements LakeCatalog {
+ private static final Logger LOG =
LoggerFactory.getLogger(PaimonLakeCatalog.class);
public static final LinkedHashMap<String, DataType> SYSTEM_COLUMNS = new
LinkedHashMap<>();
static {
@@ -109,13 +113,92 @@ public class PaimonLakeCatalog implements LakeCatalog {
throws TableNotExistException {
try {
List<SchemaChange> paimonSchemaChanges =
toPaimonSchemaChanges(tableChanges);
- alterTable(tablePath, paimonSchemaChanges);
- } catch (Catalog.ColumnAlreadyExistException |
Catalog.ColumnNotExistException e) {
- // shouldn't happen before we support schema change
- throw new RuntimeException(e);
+
+ // Compare current Paimon table schema with expected target schema
before altering
+ if (shouldAlterTable(tablePath, tableChanges)) {
+ alterTable(tablePath, paimonSchemaChanges);
+ } else {
+ // If schemas already match, treat as idempotent success
+ LOG.info(
+ "Skipping schema evolution for Paimon table {} because
the column(s) to add {} already exist.",
+ tablePath,
+ tableChanges);
+ }
+ } catch (Catalog.ColumnAlreadyExistException e) {
+ // This shouldn't happen if shouldAlterTable works correctly, but
keep as safeguard
+ throw new InvalidAlterTableException(e.getMessage());
+ } catch (Catalog.ColumnNotExistException e) {
+ // This shouldn't happen for AddColumn operations
+ throw new InvalidAlterTableException(e.getMessage());
}
}
+ private boolean shouldAlterTable(TablePath tablePath, List<TableChange>
tableChanges)
+ throws TableNotExistException {
+ try {
+ Table table = paimonCatalog.getTable(toPaimon(tablePath));
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ Schema currentSchema = fileStoreTable.schema().toSchema();
+
+ for (TableChange change : tableChanges) {
+ if (change instanceof TableChange.AddColumn) {
+ TableChange.AddColumn addColumn = (TableChange.AddColumn)
change;
+ if (!isColumnAlreadyExists(currentSchema, addColumn)) {
+ return true;
+ }
+ } else {
+ return true;
+ }
+ }
+
+ return false;
+ } catch (Catalog.TableNotExistException e) {
+ throw new TableNotExistException("Table " + tablePath + " does not
exist.");
+ }
+ }
+
+ private boolean isColumnAlreadyExists(Schema currentSchema,
TableChange.AddColumn addColumn) {
+ String columnName = addColumn.getName();
+
+ for (org.apache.paimon.types.DataField field : currentSchema.fields())
{
+ if (field.name().equals(columnName)) {
+ org.apache.paimon.types.DataType expectedType =
+ addColumn
+ .getDataType()
+ .accept(
+ org.apache.fluss.lake.paimon.utils
+
.FlussDataTypeToPaimonDataType.INSTANCE);
+
+ if (!field.type().equals(expectedType)) {
+ throw new InvalidAlterTableException(
+ String.format(
+ "Column '%s' already exists but with
different type. "
+ + "Existing: %s, Expected: %s",
+ columnName, field.type(), expectedType));
+ }
+ String existingComment = field.description();
+ String expectedComment = addColumn.getComment();
+
+ boolean commentsMatch =
+ (existingComment == null && expectedComment == null)
+ || (existingComment != null
+ &&
existingComment.equals(expectedComment));
+
+ if (!commentsMatch) {
+ throw new InvalidAlterTableException(
+ String.format(
+ "Column %s already exists but with
different comment. "
+ + "Existing: %s, Expected: %s",
+ columnName, existingComment,
expectedComment));
+ }
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
private void createTable(TablePath tablePath, Schema schema, boolean
isCreatingFlussTable)
throws Catalog.DatabaseNotExistException {
Identifier paimonPath = toPaimon(tablePath);
@@ -134,7 +217,7 @@ public class PaimonLakeCatalog implements LakeCatalog {
}
} catch (Catalog.TableNotExistException tableNotExistException) {
// shouldn't happen in normal cases
- throw new RuntimeException(
+ throw new InvalidAlterTableException(
String.format(
"Failed to create table %s in Paimon. The
table already existed "
+ "during the initial creation
attempt, but subsequently "
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
index 24304d0e4..c092fbdab 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
@@ -27,7 +27,6 @@ import org.apache.paimon.types.RowType;
import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
-import static org.apache.fluss.utils.Preconditions.checkState;
/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {
@@ -35,27 +34,42 @@ public class FlussRecordAsPaimonRow extends
FlussRowAsPaimonRow {
private final int bucket;
private LogRecord logRecord;
private int originRowFieldCount;
+ private final int businessFieldCount;
+ private final int bucketFieldIndex;
+ private final int offsetFieldIndex;
+ private final int timestampFieldIndex;
public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
super(tableTowType);
this.bucket = bucket;
+ this.businessFieldCount = tableRowType.getFieldCount() -
SYSTEM_COLUMNS.size();
+ this.bucketFieldIndex = businessFieldCount;
+ this.offsetFieldIndex = businessFieldCount + 1;
+ this.timestampFieldIndex = businessFieldCount + 2;
}
public void setFlussRecord(LogRecord logRecord) {
this.logRecord = logRecord;
this.internalRow = logRecord.getRow();
- this.originRowFieldCount = internalRow.getFieldCount();
- checkState(
- originRowFieldCount == tableRowType.getFieldCount() -
SYSTEM_COLUMNS.size(),
- "The paimon table fields count must equals to LogRecord's
fields count.");
+ int flussFieldCount = internalRow.getFieldCount();
+ if (flussFieldCount > businessFieldCount) {
+ // Fluss record is wider than Paimon schema, which means Lake
schema is not yet
+ // synchronized. With "Lake First" strategy, this should not
happen in normal cases.
+ throw new IllegalStateException(
+ String.format(
+ "Fluss record has %d fields but Paimon schema only
has %d business fields. "
+ + "This indicates the lake schema is not
yet synchronized. "
+ + "Please retry the schema change
operation.",
+ flussFieldCount, businessFieldCount));
+ }
+ this.originRowFieldCount = flussFieldCount;
}
@Override
public int getFieldCount() {
- return
- // business (including partitions) + system (three system fields:
bucket, offset,
+ // business (including partitions) + system (three system fields:
bucket, offset,
// timestamp)
- originRowFieldCount + SYSTEM_COLUMNS.size();
+ return tableRowType.getFieldCount();
}
@Override
@@ -68,28 +82,44 @@ public class FlussRecordAsPaimonRow extends
FlussRowAsPaimonRow {
if (pos < originRowFieldCount) {
return super.isNullAt(pos);
}
+ if (pos < businessFieldCount) {
+ // Padding NULL for missing business fields when Paimon schema is
wider than Fluss
+ return true;
+ }
// is the last three system fields: bucket, offset, timestamp which
are never null
return false;
}
@Override
public int getInt(int pos) {
- if (pos == originRowFieldCount) {
+ if (pos == bucketFieldIndex) {
// bucket system column
return bucket;
}
+ if (pos >= originRowFieldCount) {
+ throw new IllegalStateException(
+ String.format(
+ "Field %s is NULL because Paimon schema is wider
than Fluss record.",
+ pos));
+ }
return super.getInt(pos);
}
@Override
public long getLong(int pos) {
- if (pos == originRowFieldCount + 1) {
+ if (pos == offsetFieldIndex) {
// offset system column
return logRecord.logOffset();
- } else if (pos == originRowFieldCount + 2) {
+ } else if (pos == timestampFieldIndex) {
// timestamp system column
return logRecord.timestamp();
}
+ if (pos >= originRowFieldCount) {
+ throw new IllegalStateException(
+ String.format(
+ "Field %s is NULL because Paimon schema is wider
than Fluss record.",
+ pos));
+ }
// the origin RowData
return super.getLong(pos);
}
@@ -97,9 +127,15 @@ public class FlussRecordAsPaimonRow extends
FlussRowAsPaimonRow {
@Override
public Timestamp getTimestamp(int pos, int precision) {
// it's timestamp system column
- if (pos == originRowFieldCount + 2) {
+ if (pos == timestampFieldIndex) {
return Timestamp.fromEpochMillis(logRecord.timestamp());
}
+ if (pos >= originRowFieldCount) {
+ throw new IllegalStateException(
+ String.format(
+ "Field %s is NULL because Paimon schema is wider
than Fluss record.",
+ pos));
+ }
return super.getTimestamp(pos, precision);
}
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
index 8d561957a..ded40ac59 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
@@ -128,6 +128,31 @@ public class PaimonConversions {
schemaChanges.add(
SchemaChange.removeOption(
convertFlussPropertyKeyToPaimon(resetOption.getKey())));
+ } else if (tableChange instanceof TableChange.AddColumn) {
+ TableChange.AddColumn addColumn = (TableChange.AddColumn)
tableChange;
+
+ if (!(addColumn.getPosition() instanceof TableChange.Last)) {
+ throw new UnsupportedOperationException(
+ "Only support to add column at last for paimon
table.");
+ }
+
+ org.apache.fluss.types.DataType flussDataType =
addColumn.getDataType();
+ if (!flussDataType.isNullable()) {
+ throw new UnsupportedOperationException(
+ "Only support to add nullable column for paimon
table.");
+ }
+
+ org.apache.paimon.types.DataType paimonDataType =
+
flussDataType.accept(FlussDataTypeToPaimonDataType.INSTANCE);
+
+ String firstSystemColumnName =
SYSTEM_COLUMNS.keySet().iterator().next();
+ schemaChanges.add(
+ SchemaChange.addColumn(
+ addColumn.getName(),
+ paimonDataType,
+ addColumn.getComment(),
+ SchemaChange.Move.before(
+ addColumn.getName(),
firstSystemColumnName)));
} else {
throw new UnsupportedOperationException(
"Unsupported table change: " + tableChange.getClass());
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 699553db6..9491918b4 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -23,7 +23,6 @@ import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.FlussRuntimeException;
-import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.LakeTableAlreadyExistException;
@@ -906,11 +905,26 @@ class LakeEnabledTableCreateITCase {
DataTypes.INT(),
"c3 comment",
TableChange.ColumnPosition.last()));
- assertThatThrownBy(() -> admin.alterTable(tablePath, tableChanges,
false).get())
- .cause()
- .isInstanceOf(InvalidAlterTableException.class)
- .hasMessage(
- "Schema evolution is currently not supported for
tables with datalake enabled.");
+
+ admin.alterTable(tablePath, tableChanges, false).get();
+
+ Table alteredPaimonTable =
+ paimonCatalog.getTable(Identifier.create(DATABASE,
tablePath.getTableName()));
+ // Verify the new column c3 with comment was added to Paimon table
+ RowType alteredRowType = alteredPaimonTable.rowType();
+ assertThat(alteredRowType.getFieldCount()).isEqualTo(6);
+ assertThat(alteredRowType.getFieldNames())
+ .containsExactly(
+ "c1",
+ "c2",
+ "c3",
+ BUCKET_COLUMN_NAME,
+ OFFSET_COLUMN_NAME,
+ TIMESTAMP_COLUMN_NAME);
+ // Verify c3 column has the correct type and comment
+ assertThat(alteredRowType.getField("c3").type())
+ .isEqualTo(org.apache.paimon.types.DataTypes.INT());
+ assertThat(alteredRowType.getField("c3").description()).isEqualTo("c3
comment");
}
private void verifyPaimonTable(
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
index 4c99afdf7..ea18b85f6 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
@@ -18,6 +18,7 @@
package org.apache.fluss.lake.paimon;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext;
import org.apache.fluss.metadata.Schema;
@@ -34,6 +35,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.util.Collections;
+import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -113,6 +115,134 @@ class PaimonLakeCatalogTest {
.hasMessage("Table alter_props_db.non_existing_table does not
exist.");
}
+ @Test
+ void testAlterTableAddColumnLastNullable() throws Exception {
+ String database = "test_alter_table_add_column_db";
+ String tableName = "test_alter_table_add_column_table";
+ TablePath tablePath = TablePath.of(database, tableName);
+ Identifier identifier = Identifier.create(database, tableName);
+ createTable(database, tableName);
+
+ List<TableChange> changes =
+ Collections.singletonList(
+ TableChange.addColumn(
+ "new_col",
+ DataTypes.INT(),
+ "new_col comment",
+ TableChange.ColumnPosition.last()));
+
+ flussPaimonCatalog.alterTable(tablePath, changes, new
TestingLakeCatalogContext());
+
+ Table table =
flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
+ assertThat(table.rowType().getFieldNames())
+ .containsSequence(
+ "id",
+ "name",
+ "amount",
+ "address",
+ "new_col",
+ "__bucket",
+ "__offset",
+ "__timestamp");
+ }
+
+ @Test
+ void testAlterTableAddColumnNotLast() {
+ String database = "test_alter_table_add_column_not_last_db";
+ String tableName = "test_alter_table_add_column_not_last_table";
+ TablePath tablePath = TablePath.of(database, tableName);
+ createTable(database, tableName);
+
+ List<TableChange> changes =
+ Collections.singletonList(
+ TableChange.addColumn(
+ "new_col",
+ DataTypes.INT(),
+ null,
+ TableChange.ColumnPosition.first()));
+
+ assertThatThrownBy(
+ () ->
+ flussPaimonCatalog.alterTable(
+ tablePath, changes, new
TestingLakeCatalogContext()))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Only support to add column at last for paimon
table.");
+ }
+
+ @Test
+ void testAlterTableAddColumnNotNullable() {
+ String database = "test_alter_table_add_column_not_nullable_db";
+ String tableName = "test_alter_table_add_column_not_nullable_table";
+ TablePath tablePath = TablePath.of(database, tableName);
+ createTable(database, tableName);
+
+ List<TableChange> changes =
+ Collections.singletonList(
+ TableChange.addColumn(
+ "new_col",
+ DataTypes.INT().copy(false),
+ null,
+ TableChange.ColumnPosition.last()));
+
+ assertThatThrownBy(
+ () ->
+ flussPaimonCatalog.alterTable(
+ tablePath, changes, new
TestingLakeCatalogContext()))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Only support to add nullable column for paimon
table.");
+ }
+
+ @Test
+ void testAlterTableAddExistingColumn() {
+ String database = "test_alter_table_add_existing_column_db";
+ String tableName = "test_alter_table_add_existing_column_table";
+ TablePath tablePath = TablePath.of(database, tableName);
+ createTable(database, tableName);
+
+ List<TableChange> changes =
+ Collections.singletonList(
+ TableChange.addColumn(
+ "address",
+ DataTypes.STRING(),
+ null,
+ TableChange.ColumnPosition.last()));
+
+ // no exception thrown when adding existing column
+ flussPaimonCatalog.alterTable(tablePath, changes, new
TestingLakeCatalogContext());
+
+ List<TableChange> changes2 =
+ Collections.singletonList(
+ TableChange.addColumn(
+ "address",
+ DataTypes.INT(),
+ null,
+ TableChange.ColumnPosition.last()));
+
+ assertThatThrownBy(
+ () ->
+ flussPaimonCatalog.alterTable(
+ tablePath, changes2, new
TestingLakeCatalogContext()))
+ .isInstanceOf(InvalidAlterTableException.class)
+ .hasMessage(
+ "Column 'address' already exists but with different
type. Existing: STRING, Expected: INT");
+
+ List<TableChange> changes3 =
+ Collections.singletonList(
+ TableChange.addColumn(
+ "address",
+ DataTypes.STRING(),
+ "the address comment",
+ TableChange.ColumnPosition.last()));
+
+ assertThatThrownBy(
+ () ->
+ flussPaimonCatalog.alterTable(
+ tablePath, changes3, new
TestingLakeCatalogContext()))
+ .isInstanceOf(InvalidAlterTableException.class)
+ .hasMessage(
+ "Column address already exists but with different
comment. Existing: null, Expected: the address comment");
+ }
+
private void createTable(String database, String tableName) {
Schema flussSchema =
Schema.newBuilder()
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java
index 79b548966..369fc7498 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java
@@ -40,6 +40,7 @@ import static org.apache.fluss.record.ChangeType.INSERT;
import static org.apache.fluss.record.ChangeType.UPDATE_AFTER;
import static org.apache.fluss.record.ChangeType.UPDATE_BEFORE;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link FlussRecordAsPaimonRow}. */
class FlussRecordAsPaimonRowTest {
@@ -545,4 +546,64 @@ class FlussRecordAsPaimonRowTest {
assertThat(array).isNotNull();
assertThat(array.size()).isEqualTo(0);
}
+
+ @Test
+ void testPaimonSchemaWiderThanFlussRecord() {
+ int tableBucket = 0;
+ RowType tableRowType =
+ RowType.of(
+ new org.apache.paimon.types.BooleanType(),
+ new org.apache.paimon.types.VarCharType(),
+ // append three system columns: __bucket,
__offset,__timestamp
+ new org.apache.paimon.types.IntType(),
+ new org.apache.paimon.types.BigIntType(),
+ new
org.apache.paimon.types.LocalZonedTimestampType(3));
+
+ FlussRecordAsPaimonRow flussRecordAsPaimonRow =
+ new FlussRecordAsPaimonRow(tableBucket, tableRowType);
+
+ long logOffset = 7L;
+ long timeStamp = System.currentTimeMillis();
+ GenericRow genericRow = new GenericRow(1);
+ genericRow.setField(0, true);
+ LogRecord logRecord = new GenericRecord(logOffset, timeStamp,
APPEND_ONLY, genericRow);
+ flussRecordAsPaimonRow.setFlussRecord(logRecord);
+
+ assertThat(flussRecordAsPaimonRow.getFieldCount()).isEqualTo(5);
+
+ assertThat(flussRecordAsPaimonRow.getBoolean(0)).isTrue();
+ assertThat(flussRecordAsPaimonRow.isNullAt(1)).isTrue();
+ assertThat(flussRecordAsPaimonRow.getInt(2)).isEqualTo(tableBucket);
+ assertThat(flussRecordAsPaimonRow.getLong(3)).isEqualTo(logOffset);
+ assertThat(flussRecordAsPaimonRow.getTimestamp(4, 3))
+ .isEqualTo(Timestamp.fromEpochMillis(timeStamp));
+ }
+
+ @Test
+ void testFlussRecordWiderThanPaimonSchema() {
+ int tableBucket = 0;
+ RowType tableRowType =
+ RowType.of(
+ new org.apache.paimon.types.BooleanType(),
+ // append three system columns: __bucket,
__offset,__timestamp
+ new org.apache.paimon.types.IntType(),
+ new org.apache.paimon.types.BigIntType(),
+ new
org.apache.paimon.types.LocalZonedTimestampType(3));
+
+ FlussRecordAsPaimonRow flussRecordAsPaimonRow =
+ new FlussRecordAsPaimonRow(tableBucket, tableRowType);
+
+ long logOffset = 7L;
+ long timeStamp = System.currentTimeMillis();
+ GenericRow genericRow = new GenericRow(2);
+ genericRow.setField(0, true);
+ genericRow.setField(1, BinaryString.fromString("extra"));
+ LogRecord logRecord = new GenericRecord(logOffset, timeStamp,
APPEND_ONLY, genericRow);
+
+ // Should throw exception instead of silently truncating data
+ assertThatThrownBy(() ->
flussRecordAsPaimonRow.setFlussRecord(logRecord))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(
+ "Fluss record has 2 fields but Paimon schema only has
1 business fields");
+ }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index d725038d6..73aba60cb 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -559,8 +559,9 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
InternalRow flussRow = flussRowIterator.next();
assertThat(row.getInt(0)).isEqualTo(flussRow.getInt(0));
assertThat(row.getString(1).toString()).isEqualTo(flussRow.getString(1).toString());
- // the idx 2 is __bucket, so use 3
- assertThat(row.getLong(3)).isEqualTo(startingOffset++);
+ // system columns are always the last three: __bucket, __offset,
__timestamp
+ int offsetIndex = row.getFieldCount() - 2;
+ assertThat(row.getLong(offsetIndex)).isEqualTo(startingOffset++);
}
assertThat(flussRowIterator.hasNext()).isFalse();
}
@@ -603,6 +604,75 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
return reader.toCloseableIterator();
}
+ @Test
+ void testTieringWithAddColumn() throws Exception {
+ // Test ADD COLUMN during tiering with "Lake First" strategy
+
+ // 1. Create a datalake enabled table with initial schema (c1: INT,
c2: STRING)
+ TablePath tablePath = TablePath.of(DEFAULT_DB, "addColumnTable");
+ long tableId = createLogTable(tablePath);
+ TableBucket tableBucket = new TableBucket(tableId, 0);
+
+ // 2. Write initial data before ADD COLUMN
+ List<InternalRow> initialRows = Arrays.asList(row(1, "v1"), row(2,
"v2"), row(3, "v3"));
+ writeRows(tablePath, initialRows, true);
+
+ // 3. Start tiering job
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ try {
+ // 4. Wait for initial data to be tiered
+ assertReplicaStatus(tableBucket, 3);
+
+ // 5. Execute ADD COLUMN (c3: INT, nullable)
+ List<TableChange> addColumnChanges =
+ Collections.singletonList(
+ TableChange.addColumn(
+ "c3",
+ DataTypes.INT(),
+ "new column",
+ TableChange.ColumnPosition.last()));
+ admin.alterTable(tablePath, addColumnChanges, false).get();
+
+ // 6. Write more data after ADD COLUMN (with new column value)
+ // schema now has 3 business columns (c1, c2, c3), so provide
value for the new column
+ List<InternalRow> newRows =
+ Arrays.asList(row(4, "v4", 40), row(5, "v5", 50), row(6,
"v6", 60));
+ writeRows(tablePath, newRows, true);
+
+ // 7. Wait for new data to be tiered
+ assertReplicaStatus(tableBucket, 6);
+
+ // 8. Verify Paimon table has the new column with exact field
names and order
+ Identifier tableIdentifier =
+ Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName());
+ FileStoreTable paimonTable = (FileStoreTable)
paimonCatalog.getTable(tableIdentifier);
+ List<String> fieldNames = paimonTable.rowType().getFieldNames();
+
+ // Should have exact fields in order: a, b, c3, __bucket,
__offset, __timestamp
+ assertThat(fieldNames)
+ .containsExactly("a", "b", "c3", "__bucket", "__offset",
"__timestamp");
+
+ // 9. Verify both schema evolution and data correctness
+ // For initial rows (before ADD COLUMN), c3 should be NULL
+ // For new rows (after ADD COLUMN), c3 should have the provided
values
+ List<InternalRow> expectedRows = new ArrayList<>();
+ // Initial rows with NULL for c3
+ expectedRows.add(row(1, "v1", null));
+ expectedRows.add(row(2, "v2", null));
+ expectedRows.add(row(3, "v3", null));
+ // New rows with c3 values
+ expectedRows.add(row(4, "v4", 40));
+ expectedRows.add(row(5, "v5", 50));
+ expectedRows.add(row(6, "v6", 60));
+
+ checkDataInPaimonAppendOnlyTable(tablePath, expectedRows, 0);
+
+ } finally {
+ jobClient.cancel().get();
+ }
+ }
+
@Override
protected FlussClusterExtension getFlussClusterExtension() {
return FLUSS_CLUSTER_EXTENSION;
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 3d5811207..4239b6167 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -328,14 +328,21 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
+ "table properties or table schema.");
}
+ LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer =
+ lakeCatalogDynamicLoader.getLakeCatalogContainer();
+ LakeCatalog.Context lakeCatalogContext =
+ new DefaultLakeCatalogContext(false,
currentSession().getPrincipal());
+
if (!alterSchemaChanges.isEmpty()) {
metadataManager.alterTableSchema(
- tablePath, alterSchemaChanges,
request.isIgnoreIfNotExists());
+ tablePath,
+ alterSchemaChanges,
+ request.isIgnoreIfNotExists(),
+ lakeCatalogContainer.getLakeCatalog(),
+ lakeCatalogContext);
}
if (!alterTableConfigChanges.isEmpty()) {
- LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer
=
- lakeCatalogDynamicLoader.getLakeCatalogContainer();
metadataManager.alterTableProperties(
tablePath,
alterTableConfigChanges,
@@ -343,7 +350,7 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
request.isIgnoreIfNotExists(),
lakeCatalogContainer.getLakeCatalog(),
lakeTableTieringManager,
- new DefaultLakeCatalogContext(false,
currentSession().getPrincipal()));
+ lakeCatalogContext);
}
return CompletableFuture.completedFuture(new AlterTableResponse());
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index a2fc6cf3d..9d5dc3479 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -322,24 +322,33 @@ public class MetadataManager {
}
public void alterTableSchema(
- TablePath tablePath, List<TableChange> schemaChanges, boolean
ignoreIfNotExists)
+ TablePath tablePath,
+ List<TableChange> schemaChanges,
+ boolean ignoreIfNotExists,
+ @Nullable LakeCatalog lakeCatalog,
+ LakeCatalog.Context lakeCatalogContext)
throws TableNotExistException, TableNotPartitionedException {
try {
TableInfo table = getTable(tablePath);
- // TODO: remote this after lake enable table support schema
evolution, track by
- // https://github.com/apache/fluss/issues/2128
- if (table.getTableConfig().isDataLakeEnabled()) {
- throw new InvalidAlterTableException(
- "Schema evolution is currently not supported for
tables with datalake enabled.");
- }
-
// validate the table column changes
if (!schemaChanges.isEmpty()) {
Schema newSchema = SchemaUpdate.applySchemaChanges(table,
schemaChanges);
- // update the schema
- zookeeperClient.registerSchema(tablePath, newSchema,
table.getSchemaId() + 1);
+
+ // Lake First: sync to Lake before updating Fluss schema
+ syncSchemaChangesToLake(
+ tablePath, table, schemaChanges, lakeCatalog,
lakeCatalogContext);
+
+ // Update Fluss schema (ZK) after Lake sync succeeds
+ if (!newSchema.equals(table.getSchema())) {
+ zookeeperClient.registerSchema(tablePath, newSchema,
table.getSchemaId() + 1);
+ } else {
+ LOG.info(
+ "Skipping schema evolution for table {} because
the column(s) to add {} already exist.",
+ tablePath,
+ schemaChanges);
+ }
}
} catch (Exception e) {
if (e instanceof TableNotExistException) {
@@ -355,6 +364,34 @@ public class MetadataManager {
}
}
+ private void syncSchemaChangesToLake(
+ TablePath tablePath,
+ TableInfo tableInfo,
+ List<TableChange> schemaChanges,
+ @Nullable LakeCatalog lakeCatalog,
+ LakeCatalog.Context lakeCatalogContext) {
+ if (!isDataLakeEnabled(tableInfo.toTableDescriptor())) {
+ return;
+ }
+
+ if (lakeCatalog == null) {
+ throw new InvalidAlterTableException(
+ "Cannot alter schema for datalake enabled table "
+ + tablePath
+ + ", because the Fluss cluster doesn't enable
datalake tables.");
+ }
+
+ try {
+ lakeCatalog.alterTable(tablePath, schemaChanges,
lakeCatalogContext);
+ } catch (TableNotExistException e) {
+ throw new FlussRuntimeException(
+ "Lake table doesn't exist for lake-enabled table "
+ + tablePath
+ + ", which shouldn't happen. Please check if the
lake table was deleted manually.",
+ e);
+ }
+ }
+
public void alterTableProperties(
TablePath tablePath,
List<TableChange> tableChanges,
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
index 9efab2a51..3162e4bed 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
/** Schema update. */
@@ -83,9 +84,16 @@ public class SchemaUpdate {
}
private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
- if (existedColumns.containsKey(addColumn.getName())) {
- throw new IllegalArgumentException(
- "Column " + addColumn.getName() + " already exists.");
+ Schema.Column existingColumn = existedColumns.get(addColumn.getName());
+ if (existingColumn != null) {
+ // Allow idempotent retries: if column name/type/comment match
existing, treat as no-op
+ if (!existingColumn.getDataType().equals(addColumn.getDataType())
+ || !Objects.equals(
+ existingColumn.getComment().orElse(null),
addColumn.getComment())) {
+ throw new IllegalArgumentException(
+ "Column " + addColumn.getName() + " already exists.");
+ }
+ return this;
}
TableChange.ColumnPosition position = addColumn.getPosition();
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index 5e99b0023..dcfa6b5b3 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -1230,7 +1230,7 @@ class CoordinatorEventProcessorTest {
}
private void alterTable(TablePath tablePath, List<TableChange>
schemaChanges) {
- metadataManager.alterTableSchema(tablePath, schemaChanges, true);
+ metadataManager.alterTableSchema(tablePath, schemaChanges, true, null,
null);
}
private TableDescriptor getPartitionedTable() {
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
index 0ed0c9d95..bb9a25d1d 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
@@ -301,7 +301,9 @@ class TableChangeWatcherTest {
DataTypes.INT(),
null,
TableChange.ColumnPosition.last())),
- false);
+ false,
+ null,
+ null);
Schema newSchema =
Schema.newBuilder()
.fromSchema(tableInfo.getSchema())