This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 96faf08cb [lake/paimon] Compare paimon schema and Fluss schema before
alter table. (#2331)
96faf08cb is described below
commit 96faf08cbd84d1d43a2ef610c189829fa9c34e76
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Jan 29 14:30:22 2026 +0800
[lake/paimon] Compare paimon schema and Fluss schema before alter table.
(#2331)
---
.../fluss/client/admin/FlussAdminITCase.java | 16 ++
.../apache/fluss/lake/lakestorage/LakeCatalog.java | 14 ++
.../lakestorage/TestingLakeCatalogContext.java | 27 +++
.../fluss/lake/paimon/PaimonLakeCatalog.java | 136 +++++---------
.../fluss/lake/paimon/utils/PaimonConversions.java | 3 +-
.../lake/paimon/utils/PaimonTableValidation.java | 43 +++--
.../fluss/lake/paimon/PaimonLakeCatalogTest.java | 202 +++++++++++++++++----
.../server/coordinator/CoordinatorService.java | 36 +++-
.../fluss/server/coordinator/MetadataManager.java | 27 ++-
.../fluss/server/coordinator/SchemaUpdate.java | 22 +--
10 files changed, 355 insertions(+), 171 deletions(-)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 5560e14ee..5dff27b58 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -436,6 +436,22 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
DataTypeChecks.equalsWithFieldId(
schemaInfo.getSchema().getRowType(),
expectedSchema.getRowType()))
.isTrue();
+
+ assertThatThrownBy(
+ () ->
+ admin.alterTable(
+ tablePath,
+ Collections.singletonList(
+ TableChange.addColumn(
+ "nested_row",
+ DataTypes.ROW(
+
DataTypes.STRING(),
+
DataTypes.INT()),
+ "new nested
column",
+
TableChange.ColumnPosition.last())),
+ false)
+ .get())
+ .hasMessageContaining("Column nested_row already exists");
}
@Test
diff --git
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
index 4cbccb6c1..7fa2f3ea1 100644
---
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
+++
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
@@ -25,6 +25,8 @@ import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.security.acl.FlussPrincipal;
+import javax.annotation.Nullable;
+
import java.util.List;
/**
@@ -84,5 +86,17 @@ public interface LakeCatalog extends AutoCloseable {
/** Get the fluss principal currently accessing the catalog. */
FlussPrincipal getFlussPrincipal();
+
+ /**
+ * Get the current table info of fluss.
+ *
+ * @return the current table info of fluss. Null if the table does not
exist.
+ * @since 0.10
+ */
+ @Nullable
+ TableDescriptor getCurrentTable();
+
+ /** Get the expected table info of fluss. */
+ TableDescriptor getExpectedTable();
}
}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
index 7406b13b6..d3efd712e 100644
---
a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
+++
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
@@ -17,11 +17,28 @@
package org.apache.fluss.lake.lakestorage;
+import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.security.acl.FlussPrincipal;
/** A testing implementation of {@link LakeCatalog.Context}. */
public class TestingLakeCatalogContext implements LakeCatalog.Context {
+ private final TableDescriptor currentTable;
+ private final TableDescriptor expectedTable;
+
+ public TestingLakeCatalogContext(TableDescriptor tableDescriptor) {
+ this(tableDescriptor, tableDescriptor);
+ }
+
+ public TestingLakeCatalogContext(TableDescriptor currentTable,
TableDescriptor expectedTable) {
+ this.currentTable = currentTable;
+ this.expectedTable = expectedTable;
+ }
+
+ public TestingLakeCatalogContext() {
+ this(null);
+ }
+
@Override
public boolean isCreatingFlussTable() {
return false;
@@ -31,4 +48,14 @@ public class TestingLakeCatalogContext implements
LakeCatalog.Context {
public FlussPrincipal getFlussPrincipal() {
return null;
}
+
+ @Override
+ public TableDescriptor getCurrentTable() {
+ return currentTable;
+ }
+
+ @Override
+ public TableDescriptor getExpectedTable() {
+ return expectedTable;
+ }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index 500546e64..6548b7e26 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -44,12 +44,13 @@ import org.slf4j.LoggerFactory;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.stream.Collectors;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges;
import static
org.apache.fluss.lake.paimon.utils.PaimonTableValidation.checkTableIsEmpty;
-import static
org.apache.fluss.lake.paimon.utils.PaimonTableValidation.validatePaimonSchemaCompatible;
+import static
org.apache.fluss.lake.paimon.utils.PaimonTableValidation.isPaimonSchemaCompatible;
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
@@ -111,94 +112,51 @@ public class PaimonLakeCatalog implements LakeCatalog {
@Override
public void alterTable(TablePath tablePath, List<TableChange>
tableChanges, Context context)
throws TableNotExistException {
- try {
- List<SchemaChange> paimonSchemaChanges =
toPaimonSchemaChanges(tableChanges);
-
- // Compare current Paimon table schema with expected target schema
before altering
- if (shouldAlterTable(tablePath, tableChanges)) {
- alterTable(tablePath, paimonSchemaChanges);
- } else {
- // If schemas already match, treat as idempotent success
- LOG.info(
- "Skipping schema evolution for Paimon table {} because
the column(s) to add {} already exist.",
- tablePath,
- tableChanges);
- }
- } catch (Catalog.ColumnAlreadyExistException e) {
- // This shouldn't happen if shouldAlterTable works correctly, but
keep as safeguard
- throw new InvalidAlterTableException(e.getMessage());
- } catch (Catalog.ColumnNotExistException e) {
- // This shouldn't happen for AddColumn operations
- throw new InvalidAlterTableException(e.getMessage());
- }
- }
-
- private boolean shouldAlterTable(TablePath tablePath, List<TableChange>
tableChanges)
- throws TableNotExistException {
try {
Table table = paimonCatalog.getTable(toPaimon(tablePath));
FileStoreTable fileStoreTable = (FileStoreTable) table;
- Schema currentSchema = fileStoreTable.schema().toSchema();
-
- for (TableChange change : tableChanges) {
- if (change instanceof TableChange.AddColumn) {
- TableChange.AddColumn addColumn = (TableChange.AddColumn)
change;
- if (!isColumnAlreadyExists(currentSchema, addColumn)) {
- return true;
- }
- } else {
- return true;
- }
+ Schema currentPaimonSchema = fileStoreTable.schema().toSchema();
+
+ List<SchemaChange> paimonSchemaChanges;
+ if (isPaimonSchemaCompatible(
+ currentPaimonSchema,
toPaimonSchema(context.getCurrentTable()))) {
+ // if the paimon schema is same as current fluss schema,
directly apply all the
+ // changes.
+ paimonSchemaChanges = toPaimonSchemaChanges(tableChanges);
+ } else if (isPaimonSchemaCompatible(
+ currentPaimonSchema,
toPaimonSchema(context.getExpectedTable()))) {
+ // if the schema is same as applied fluss schema , skip adding
columns.
+ paimonSchemaChanges =
+ toPaimonSchemaChanges(
+ tableChanges.stream()
+ .filter(
+ tableChange ->
+ !(tableChange
+ instanceof
TableChange.AddColumn))
+ .collect(Collectors.toList()));
+ } else {
+ throw new InvalidAlterTableException(
+ String.format(
+ "Paimon schema is not compatible with Fluss
schema: "
+ + "Paimon schema: %s, Fluss schema:
%s. "
+ + "therefore you need to add the diff
columns all at once, "
+ + "rather than applying other table
changes: %s.",
+ currentPaimonSchema,
+ context.getCurrentTable().getSchema(),
+ tableChanges));
}
- return false;
+ if (!paimonSchemaChanges.isEmpty()) {
+ paimonCatalog.alterTable(toPaimon(tablePath),
paimonSchemaChanges, false);
+ }
+ } catch (Catalog.ColumnAlreadyExistException |
Catalog.ColumnNotExistException e) {
+ // This shouldn't happen for AddColumn operations
+ throw new InvalidAlterTableException(e.getMessage());
} catch (Catalog.TableNotExistException e) {
throw new TableNotExistException("Table " + tablePath + " does not
exist.");
}
}
- private boolean isColumnAlreadyExists(Schema currentSchema,
TableChange.AddColumn addColumn) {
- String columnName = addColumn.getName();
-
- for (org.apache.paimon.types.DataField field : currentSchema.fields())
{
- if (field.name().equals(columnName)) {
- org.apache.paimon.types.DataType expectedType =
- addColumn
- .getDataType()
- .accept(
- org.apache.fluss.lake.paimon.utils
-
.FlussDataTypeToPaimonDataType.INSTANCE);
-
- if (!field.type().equals(expectedType)) {
- throw new InvalidAlterTableException(
- String.format(
- "Column '%s' already exists but with
different type. "
- + "Existing: %s, Expected: %s",
- columnName, field.type(), expectedType));
- }
- String existingComment = field.description();
- String expectedComment = addColumn.getComment();
-
- boolean commentsMatch =
- (existingComment == null && expectedComment == null)
- || (existingComment != null
- &&
existingComment.equals(expectedComment));
-
- if (!commentsMatch) {
- throw new InvalidAlterTableException(
- String.format(
- "Column %s already exists but with
different comment. "
- + "Existing: %s, Expected: %s",
- columnName, existingComment,
expectedComment));
- }
-
- return true;
- }
- }
-
- return false;
- }
-
private void createTable(TablePath tablePath, Schema schema, boolean
isCreatingFlussTable)
throws Catalog.DatabaseNotExistException {
Identifier paimonPath = toPaimon(tablePath);
@@ -209,8 +167,15 @@ public class PaimonLakeCatalog implements LakeCatalog {
try {
Table table = paimonCatalog.getTable(paimonPath);
FileStoreTable fileStoreTable = (FileStoreTable) table;
- validatePaimonSchemaCompatible(
- paimonPath, fileStoreTable.schema().toSchema(),
schema);
+ Schema existingSchema = fileStoreTable.schema().toSchema();
+ if (!isPaimonSchemaCompatible(existingSchema, schema)) {
+ throw new TableAlreadyExistException(
+ String.format(
+ "The table %s already exists in Paimon
catalog, but the table schema is not compatible. "
+ + "Existing schema: %s, new
schema: %s. "
+ + "Please first drop the table in
Paimon catalog or use a new table name.",
+ paimonPath.getEscapedFullName(),
existingSchema, schema));
+ }
// if creating a new fluss table, we should ensure the lake
table is empty
if (isCreatingFlussTable) {
checkTableIsEmpty(tablePath, fileStoreTable);
@@ -237,15 +202,6 @@ public class PaimonLakeCatalog implements LakeCatalog {
}
}
- private void alterTable(TablePath tablePath, List<SchemaChange>
tableChanges)
- throws Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
- try {
- paimonCatalog.alterTable(toPaimon(tablePath), tableChanges, false);
- } catch (Catalog.TableNotExistException e) {
- throw new TableNotExistException("Table " + tablePath + " does not
exist.");
- }
- }
-
@Override
public void close() {
IOUtils.closeQuietly(paimonCatalog, "paimon catalog");
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
index ded40ac59..ee6d0f0d7 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
@@ -57,6 +57,7 @@ public class PaimonConversions {
// for fluss config
public static final String FLUSS_CONF_PREFIX = "fluss.";
+ public static final String TABLE_DATALAKE_PAIMON_PREFIX =
"table.datalake.paimon.";
// for paimon config
private static final String PAIMON_CONF_PREFIX = "paimon.";
@@ -261,7 +262,7 @@ public class PaimonConversions {
private static void setFlussPropertyToPaimon(String key, String value,
Options options) {
if (key.startsWith(PAIMON_CONF_PREFIX)) {
options.set(key.substring(PAIMON_CONF_PREFIX.length()), value);
- } else {
+ } else if (!key.startsWith(TABLE_DATALAKE_PAIMON_PREFIX)) {
options.set(FLUSS_CONF_PREFIX + key, value);
}
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
index 94580df17..a15feefeb 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java
@@ -21,7 +21,6 @@ import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.metadata.TablePath;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
@@ -34,6 +33,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
@@ -43,8 +43,7 @@ public class PaimonTableValidation {
private static final Map<String, ConfigOption<?>> PAIMON_CONFIGS =
extractPaimonConfigs();
- public static void validatePaimonSchemaCompatible(
- Identifier tablePath, Schema existingSchema, Schema newSchema) {
+ public static boolean isPaimonSchemaCompatible(Schema existingSchema,
Schema newSchema) {
// Adjust options for comparison
Map<String, String> existingOptions = existingSchema.options();
Map<String, String> newOptions = newSchema.options();
@@ -66,21 +65,16 @@ public class PaimonTableValidation {
// ignore the existing options that are not in new options
existingOptions.entrySet().removeIf(entry ->
!newOptions.containsKey(entry.getKey()));
- if (!existingSchema.equals(newSchema)) {
+ // ignore the fields because newSchema is referred by fluss schema,
whose field id maybe not
+ // same as existingSchema.
+ if (!equalIgnoreFieldId(existingSchema, newSchema)) {
// Allow different precisions for __timestamp column for backward
compatibility,
// old cluster will use precision 6, but new cluster will use
precision 3,
// we allow such precision difference
- if (equalIgnoreSystemColumnTimestampPrecision(existingSchema,
newSchema)) {
- return;
- }
-
- throw new TableAlreadyExistException(
- String.format(
- "The table %s already exists in Paimon catalog,
but the table schema is not compatible. "
- + "Existing schema: %s, new schema: %s. "
- + "Please first drop the table in Paimon
catalog or use a new table name.",
- tablePath.getEscapedFullName(), existingSchema,
newSchema));
+ return equalIgnoreSystemColumnTimestampPrecision(existingSchema,
newSchema);
}
+
+ return true;
}
/**
@@ -96,7 +90,7 @@ public class PaimonTableValidation {
* @param newSchema the new schema descriptor generated by the current
Fluss cluster
* @return true if the schemas are identical, disregarding the precision
of the system timestamp
*/
- private static boolean equalIgnoreSystemColumnTimestampPrecision(
+ public static boolean equalIgnoreSystemColumnTimestampPrecision(
Schema existingSchema, Schema newSchema) {
List<DataField> existingFields = new
ArrayList<>(existingSchema.fields());
DataField systemTimestampField =
existingFields.get(existingFields.size() - 1);
@@ -113,7 +107,24 @@ public class PaimonTableValidation {
systemTimestampField.description()));
}
existingSchema =
existingSchema.copy(RowType.of(existingFields.toArray(new DataField[0])));
- return existingSchema.equals(newSchema);
+ return equalIgnoreFieldId(existingSchema, newSchema);
+ }
+
+ private static boolean equalIgnoreFieldId(Schema existingSchema, Schema
newSchema) {
+ List<DataField> existingFields = existingSchema.fields();
+ List<DataField> newFields = newSchema.fields();
+ for (int i = 0; i < existingFields.size(); i++) {
+ DataField existingField = existingFields.get(i);
+ DataField newField = newFields.get(i);
+ if (!existingField.equalsIgnoreFieldId(newField)) {
+ return false;
+ }
+ }
+
+ return Objects.equals(existingSchema.partitionKeys(),
newSchema.partitionKeys())
+ && Objects.equals(existingSchema.primaryKeys(),
newSchema.primaryKeys())
+ && Objects.equals(existingSchema.options(),
newSchema.options())
+ && Objects.equals(existingSchema.comment(),
newSchema.comment());
}
private static void removeChangeableOptions(Map<String, String> options) {
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
index ea18b85f6..b56ba17cb 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
@@ -25,35 +25,57 @@ import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.coordinator.SchemaUpdate;
import org.apache.fluss.types.DataTypes;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
+import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Unit test for {@link PaimonLakeCatalog}. */
class PaimonLakeCatalogTest {
+ private static final Schema FLUSS_SCHEMA =
+ Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("name", DataTypes.STRING())
+ .column("amount", DataTypes.INT())
+ .column("address", DataTypes.STRING())
+ .build();
+ private static final TestingLakeCatalogContext LAKE_CATALOG_CONTEXT =
+ new
TestingLakeCatalogContext(TableDescriptor.builder().schema(FLUSS_SCHEMA).build());
@TempDir private File tempWarehouseDir;
private PaimonLakeCatalog flussPaimonCatalog;
@BeforeEach
- public void setUp() {
+ void setUp() {
Configuration configuration = new Configuration();
configuration.setString("warehouse",
tempWarehouseDir.toURI().toString());
flussPaimonCatalog = new PaimonLakeCatalog(configuration);
}
+ @AfterEach
+ void cleanup() {
+ flussPaimonCatalog.close();
+ setUp();
+ }
+
@Test
void testAlterTableProperties() throws Exception {
String database = "test_alter_table_properties_db";
@@ -70,7 +92,7 @@ class PaimonLakeCatalogTest {
flussPaimonCatalog.alterTable(
tablePath,
Collections.singletonList(TableChange.set("key", "value")),
- new TestingLakeCatalogContext());
+ LAKE_CATALOG_CONTEXT);
table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
// we have set the value for key
@@ -80,7 +102,7 @@ class PaimonLakeCatalogTest {
flussPaimonCatalog.alterTable(
tablePath,
Collections.singletonList(TableChange.reset("key")),
- new TestingLakeCatalogContext());
+ LAKE_CATALOG_CONTEXT);
table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
// we have reset the value for key
@@ -89,14 +111,13 @@ class PaimonLakeCatalogTest {
@Test
void alterTablePropertiesWithNonExistentTable() {
- TestingLakeCatalogContext context = new TestingLakeCatalogContext();
// db & table don't exist
assertThatThrownBy(
() ->
flussPaimonCatalog.alterTable(
TablePath.of("non_existing_db",
"non_existing_table"),
Collections.singletonList(TableChange.set("key", "value")),
- context))
+ LAKE_CATALOG_CONTEXT))
.isInstanceOf(TableNotExistException.class)
.hasMessage("Table non_existing_db.non_existing_table does not
exist.");
@@ -110,7 +131,7 @@ class PaimonLakeCatalogTest {
flussPaimonCatalog.alterTable(
TablePath.of(database,
"non_existing_table"),
Collections.singletonList(TableChange.set("key", "value")),
- context))
+ LAKE_CATALOG_CONTEXT))
.isInstanceOf(TableNotExistException.class)
.hasMessage("Table alter_props_db.non_existing_table does not
exist.");
}
@@ -131,7 +152,7 @@ class PaimonLakeCatalogTest {
"new_col comment",
TableChange.ColumnPosition.last()));
- flussPaimonCatalog.alterTable(tablePath, changes, new
TestingLakeCatalogContext());
+ flussPaimonCatalog.alterTable(tablePath, changes,
LAKE_CATALOG_CONTEXT);
Table table =
flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
assertThat(table.rowType().getFieldNames())
@@ -164,7 +185,7 @@ class PaimonLakeCatalogTest {
assertThatThrownBy(
() ->
flussPaimonCatalog.alterTable(
- tablePath, changes, new
TestingLakeCatalogContext()))
+ tablePath, changes,
LAKE_CATALOG_CONTEXT))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Only support to add column at last for paimon
table.");
}
@@ -187,18 +208,17 @@ class PaimonLakeCatalogTest {
assertThatThrownBy(
() ->
flussPaimonCatalog.alterTable(
- tablePath, changes, new
TestingLakeCatalogContext()))
+ tablePath, changes,
LAKE_CATALOG_CONTEXT))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Only support to add nullable column for paimon
table.");
}
@Test
- void testAlterTableAddExistingColumn() {
+ void testAlterTableAddExistingColumns() throws Exception {
String database = "test_alter_table_add_existing_column_db";
String tableName = "test_alter_table_add_existing_column_table";
TablePath tablePath = TablePath.of(database, tableName);
createTable(database, tableName);
-
List<TableChange> changes =
Collections.singletonList(
TableChange.addColumn(
@@ -207,13 +227,40 @@ class PaimonLakeCatalogTest {
null,
TableChange.ColumnPosition.last()));
- // no exception thrown when adding existing column
- flussPaimonCatalog.alterTable(tablePath, changes, new
TestingLakeCatalogContext());
+ assertThatThrownBy(
+ () ->
+ flussPaimonCatalog.alterTable(
+ tablePath,
+ changes,
+ getLakeCatalogContext(FLUSS_SCHEMA,
changes)))
+ .isInstanceOf(InvalidAlterTableException.class)
+ .hasMessageContaining("Column address already exists");
List<TableChange> changes2 =
- Collections.singletonList(
+ Arrays.asList(
TableChange.addColumn(
- "address",
+ "new_column",
+ DataTypes.INT(),
+ null,
+ TableChange.ColumnPosition.last()),
+ TableChange.addColumn(
+ "new_column2",
+ DataTypes.STRING(),
+ null,
+ TableChange.ColumnPosition.last()));
+
+ // mock add columns to paimon successfully but fail to add columns to
fluss.
+ flussPaimonCatalog.alterTable(
+ tablePath, changes2, getLakeCatalogContext(FLUSS_SCHEMA,
changes2));
+ List<TableChange> changes3 =
+ Arrays.asList(
+ TableChange.addColumn(
+ "new_column",
+ DataTypes.INT(),
+ null,
+ TableChange.ColumnPosition.last()),
+ TableChange.addColumn(
+ "new_column2",
DataTypes.INT(),
null,
TableChange.ColumnPosition.last()));
@@ -221,15 +268,25 @@ class PaimonLakeCatalogTest {
assertThatThrownBy(
() ->
flussPaimonCatalog.alterTable(
- tablePath, changes2, new
TestingLakeCatalogContext()))
+ tablePath,
+ changes3,
+ getLakeCatalogContext(FLUSS_SCHEMA,
changes3)))
.isInstanceOf(InvalidAlterTableException.class)
- .hasMessage(
- "Column 'address' already exists but with different
type. Existing: STRING, Expected: INT");
-
- List<TableChange> changes3 =
- Collections.singletonList(
+ .hasMessageContaining("Paimon schema is not compatible with
Fluss schema")
+ .hasMessageContaining(
+ String.format(
+ "therefore you need to add the diff columns
all at once, rather than applying other table changes: %s.",
+ changes3));
+
+ List<TableChange> changes4 =
+ Arrays.asList(
TableChange.addColumn(
- "address",
+ "new_column",
+ DataTypes.INT(),
+ null,
+ TableChange.ColumnPosition.last()),
+ TableChange.addColumn(
+ "new_column2",
DataTypes.STRING(),
"the address comment",
TableChange.ColumnPosition.last()));
@@ -237,29 +294,108 @@ class PaimonLakeCatalogTest {
assertThatThrownBy(
() ->
flussPaimonCatalog.alterTable(
- tablePath, changes3, new
TestingLakeCatalogContext()))
+ tablePath,
+ changes4,
+ getLakeCatalogContext(FLUSS_SCHEMA,
changes4)))
.isInstanceOf(InvalidAlterTableException.class)
- .hasMessage(
- "Column address already exists but with different
comment. Existing: null, Expected: the address comment");
+ .hasMessageContaining("Paimon schema is not compatible with
Fluss schema")
+ .hasMessageContaining(
+ String.format(
+ "therefore you need to add the diff columns
all at once, rather than applying other table changes: %s.",
+ changes4));
+
+ // no exception thrown only when adding existing column to match fluss
and paimon.
+ flussPaimonCatalog.alterTable(
+ tablePath, changes2, getLakeCatalogContext(FLUSS_SCHEMA,
changes2));
}
- private void createTable(String database, String tableName) {
- Schema flussSchema =
+ @Test
+ void testAlterTableAddColumnWhenPaimonSchemaNotMatch() throws Exception {
+ // this rarely happens only when new fluss lake table with an existed
paimon table or use
+ // alter table in paimon side directly.
+ String database = "test_alter_table_add_column_fluss_wider";
+ String tableName = "test_alter_table_add_column_fluss_wider";
+ createTable(database, tableName);
+ TablePath tablePath = TablePath.of(database, tableName);
+ org.apache.paimon.schema.Schema paimonSchema =
+ ((FileStoreTable)
+
flussPaimonCatalog.getPaimonCatalog().getTable(toPaimon(tablePath)))
+ .schema()
+ .toSchema();
+
+ List<TableChange> changes =
+ Collections.singletonList(
+ TableChange.addColumn(
+ "new_col",
+ DataTypes.INT(),
+ "new_col comment",
+ TableChange.ColumnPosition.last()));
+
+ // test column number mismatch.
+ Schema widerFlussSchema =
Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("amount", DataTypes.INT())
.column("address", DataTypes.STRING())
+ .column("phone", DataTypes.INT())
.build();
-
- TableDescriptor td =
- TableDescriptor.builder()
- .schema(flussSchema)
- .distributedBy(3) // no bucket key
+ assertThatThrownBy(
+ () ->
+ flussPaimonCatalog.alterTable(
+ tablePath,
+ changes,
+
getLakeCatalogContext(widerFlussSchema, changes)))
+ .isInstanceOf(InvalidAlterTableException.class)
+ .hasMessageContaining("Paimon schema is not compatible with
Fluss schema")
+ .hasMessageContaining(
+ String.format(
+ "therefore you need to add the diff columns
all at once, rather than applying other table changes: %s.",
+ changes));
+
+ // test column order mismatch.
+ Schema disorderflussSchema =
+ Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("amount", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("address", DataTypes.STRING())
.build();
+ assertThatThrownBy(
+ () ->
+ flussPaimonCatalog.alterTable(
+ tablePath,
+ changes,
+
getLakeCatalogContext(disorderflussSchema, changes)))
+ .isInstanceOf(InvalidAlterTableException.class)
+ .hasMessageContaining("Paimon schema is not compatible with
Fluss schema")
+ .hasMessageContaining(
+ String.format(
+ "therefore you need to add the diff columns
all at once, rather than applying other table changes: %s.",
+ changes));
+ }
+ private void createTable(String database, String tableName) {
+ TableDescriptor td = getTableDescriptor(FLUSS_SCHEMA);
TablePath tablePath = TablePath.of(database, tableName);
- flussPaimonCatalog.createTable(tablePath, td, new
TestingLakeCatalogContext());
+ flussPaimonCatalog.createTable(tablePath, td, LAKE_CATALOG_CONTEXT);
+ }
+
+ private TestingLakeCatalogContext getLakeCatalogContext(
+ Schema schema, List<TableChange> schemaChanges) {
+ Schema expectedSchema = SchemaUpdate.applySchemaChanges(schema,
schemaChanges);
+ return new TestingLakeCatalogContext(
+ getTableDescriptor(schema),
getTableDescriptor(expectedSchema));
+ }
+
+ private TableDescriptor getTableDescriptor(Schema schema) {
+ return TableDescriptor.builder()
+ .schema(schema)
+ .property(TABLE_DATALAKE_ENABLED.key(), "true")
+ .property(TABLE_DATALAKE_FORMAT.key(), "paimon")
+ .property("table.datalake.paimon.warehouse",
tempWarehouseDir.toURI().toString())
+ .distributedBy(3) // no bucket key
+ .build();
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 6a846076a..88b1d5b08 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -389,7 +389,10 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
tablePath,
tableDescriptor,
new DefaultLakeCatalogContext(
- true,
currentSession().getPrincipal()));
+ true,
+ currentSession().getPrincipal(),
+ null,
+ tableDescriptor));
} catch (TableAlreadyExistException e) {
throw new LakeTableAlreadyExistException(e.getMessage(), e);
}
@@ -420,15 +423,12 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
+ "table properties or table schema.");
}
- LakeCatalog.Context lakeCatalogContext =
- new DefaultLakeCatalogContext(false,
currentSession().getPrincipal());
-
if (!alterSchemaChanges.isEmpty()) {
metadataManager.alterTableSchema(
tablePath,
alterSchemaChanges,
request.isIgnoreIfNotExists(),
- lakeCatalogContext);
+ currentSession().getPrincipal());
}
if (!alterTableConfigChanges.isEmpty()) {
@@ -437,7 +437,7 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
alterTableConfigChanges,
tablePropertyChanges,
request.isIgnoreIfNotExists(),
- lakeCatalogContext);
+ currentSession().getPrincipal());
}
return CompletableFuture.completedFuture(new AlterTableResponse());
@@ -1011,11 +1011,22 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
private final boolean isCreatingFlussTable;
private final FlussPrincipal flussPrincipal;
+ @Nullable private final TableDescriptor currentTable;
+ private final TableDescriptor expectedTable;
public DefaultLakeCatalogContext(
- boolean isCreatingFlussTable, FlussPrincipal flussPrincipal) {
+ boolean isCreatingFlussTable,
+ FlussPrincipal flussPrincipal,
+ @Nullable TableDescriptor currentTable,
+ TableDescriptor expectedTable) {
this.isCreatingFlussTable = isCreatingFlussTable;
this.flussPrincipal = flussPrincipal;
+ if (!isCreatingFlussTable) {
+ checkNotNull(
+ currentTable, "currentTable must be provided when
altering a Fluss table.");
+ }
+ this.currentTable = currentTable;
+ this.expectedTable = expectedTable;
}
@Override
@@ -1027,6 +1038,17 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
public FlussPrincipal getFlussPrincipal() {
return flussPrincipal;
}
+
+ @Nullable
+ @Override
+ public TableDescriptor getCurrentTable() {
+ return currentTable;
+ }
+
+ @Override
+ public TableDescriptor getExpectedTable() {
+ return expectedTable;
+ }
}
//
==================================================================================
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index 8d7b61a08..02a02cc44 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -45,6 +45,7 @@ import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePartition;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.security.acl.FlussPrincipal;
import org.apache.fluss.server.entity.TablePropertyChanges;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.DatabaseRegistration;
@@ -325,16 +326,23 @@ public class MetadataManager {
TablePath tablePath,
List<TableChange> schemaChanges,
boolean ignoreIfNotExists,
- LakeCatalog.Context lakeCatalogContext)
+ FlussPrincipal flussPrincipal)
throws TableNotExistException, TableNotPartitionedException {
try {
TableInfo table = getTable(tablePath);
+ TableDescriptor tableDescriptor = table.toTableDescriptor();
// validate the table column changes
if (!schemaChanges.isEmpty()) {
- Schema newSchema = SchemaUpdate.applySchemaChanges(table,
schemaChanges);
-
+ Schema newSchema =
+ SchemaUpdate.applySchemaChanges(table.getSchema(),
schemaChanges);
+ LakeCatalog.Context lakeCatalogContext =
+ new CoordinatorService.DefaultLakeCatalogContext(
+ false,
+ flussPrincipal,
+ tableDescriptor,
+
TableDescriptor.builder(tableDescriptor).schema(newSchema).build());
// Lake First: sync to Lake before updating Fluss schema
syncSchemaChangesToLake(tablePath, table, schemaChanges,
lakeCatalogContext);
@@ -396,7 +404,7 @@ public class MetadataManager {
List<TableChange> tableChanges,
TablePropertyChanges tablePropertyChanges,
boolean ignoreIfNotExists,
- LakeCatalog.Context lakeCatalogContext) {
+ FlussPrincipal flussPrincipal) {
try {
// it throws TableNotExistException if the table or database not
exists
TableRegistration tableReg = getTableRegistration(tablePath);
@@ -422,11 +430,7 @@ public class MetadataManager {
// pre alter table properties, e.g. create lake table in lake
storage if it's to
// enable datalake for the table
preAlterTableProperties(
- tablePath,
- tableDescriptor,
- newDescriptor,
- tableChanges,
- lakeCatalogContext);
+ tablePath, tableDescriptor, newDescriptor,
tableChanges, flussPrincipal);
// update the table to zk
TableRegistration updatedTableRegistration =
tableReg.newProperties(
@@ -456,7 +460,10 @@ public class MetadataManager {
TableDescriptor tableDescriptor,
TableDescriptor newDescriptor,
List<TableChange> tableChanges,
- LakeCatalog.Context lakeCatalogContext) {
+ FlussPrincipal flussPrincipal) {
+ LakeCatalog.Context lakeCatalogContext =
+ new CoordinatorService.DefaultLakeCatalogContext(
+ false, flussPrincipal, tableDescriptor, newDescriptor);
LakeCatalog lakeCatalog =
lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog();
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
index 011871a0b..d1ad0c16c 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java
@@ -17,20 +17,19 @@
package org.apache.fluss.server.coordinator;
+import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.SchemaChangeException;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableChange;
-import org.apache.fluss.metadata.TableInfo;
import java.util.List;
-import java.util.Objects;
/** Schema update. */
public class SchemaUpdate {
/** Apply schema changes to the given table info and return the updated
schema. */
- public static Schema applySchemaChanges(TableInfo tableInfo,
List<TableChange> changes) {
- SchemaUpdate schemaUpdate = new SchemaUpdate(tableInfo);
+ public static Schema applySchemaChanges(Schema initialSchema,
List<TableChange> changes) {
+ SchemaUpdate schemaUpdate = new SchemaUpdate(initialSchema);
for (TableChange change : changes) {
schemaUpdate = schemaUpdate.applySchemaChange(change);
}
@@ -40,9 +39,9 @@ public class SchemaUpdate {
// Now we only maintain the Builder
private final Schema.Builder builder;
- public SchemaUpdate(TableInfo tableInfo) {
+ public SchemaUpdate(Schema initialSchema) {
// Initialize builder from the current table schema
- this.builder = Schema.newBuilder().fromSchema(tableInfo.getSchema());
+ this.builder = Schema.newBuilder().fromSchema(initialSchema);
}
public Schema getSchema() {
@@ -50,7 +49,7 @@ public class SchemaUpdate {
return builder.build();
}
- public SchemaUpdate applySchemaChange(TableChange columnChange) {
+ private SchemaUpdate applySchemaChange(TableChange columnChange) {
if (columnChange instanceof TableChange.AddColumn) {
return addColumn((TableChange.AddColumn) columnChange);
} else if (columnChange instanceof TableChange.ModifyColumn) {
@@ -69,13 +68,8 @@ public class SchemaUpdate {
Schema.Column existingColumn =
builder.getColumn(addColumn.getName()).orElse(null);
if (existingColumn != null) {
- if (!existingColumn.getDataType().equals(addColumn.getDataType())
- || !Objects.equals(
- existingColumn.getComment().orElse(null),
addColumn.getComment())) {
- throw new IllegalArgumentException(
- "Column " + addColumn.getName() + " already exists.");
- }
- return this;
+ throw new InvalidAlterTableException(
+ "Column " + addColumn.getName() + " already exists.");
}
if (addColumn.getPosition() != TableChange.ColumnPosition.last()) {