Copilot commented on code in PR #1847:
URL: https://github.com/apache/fluss/pull/1847#discussion_r2517960050
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -142,6 +167,67 @@ private void alterTable(Identifier tablePath,
List<SchemaChange> tableChanges)
}
}
+ private void validatePaimonSchemaCapability(
+ Identifier tablePath, Schema existingSchema, Schema newSchema) {
+ // Adjust options for comparison
+ Map<String, String> existingOptions = new
HashMap<>(existingSchema.options());
+ Map<String, String> newOptions = new HashMap<>(newSchema.options());
+ // `path` will be set automatically by Paimon, so we need to remove it
in existing options
+ existingOptions.remove(CoreOptions.PATH.key());
+ // when enable datalake with an existing table,
`table.datalake.enabled` will be `false`
+ // in existing options, but `true` in new options.
+ String datalakeConfigKey = FLUSS_CONF_PREFIX +
ConfigOptions.TABLE_DATALAKE_ENABLED.key();
+ if
(Boolean.FALSE.toString().equalsIgnoreCase(existingOptions.get(datalakeConfigKey)))
{
+ existingOptions.remove(datalakeConfigKey);
+ newOptions.remove(datalakeConfigKey);
+ }
+
+ // Build schemas with adjusted options for comparison
+ Schema adjustedExistingSchema = buildSchemaWithOptions(existingSchema,
existingOptions);
+ Schema adjustedNewSchema = buildSchemaWithOptions(newSchema,
newOptions);
+
+ if (!adjustedExistingSchema.equals(adjustedNewSchema)) {
+ throw new TableAlreadyExistException(
+ String.format(
+ "The table %s already exists in Paimon catalog,
but the table schema is not compatible. "
+ + "Existing schema: %s, new schema: %s. "
+ + "Please first drop the table in Paimon
catalog or use a new table name.",
+ tablePath.getEscapedFullName(),
+ adjustedExistingSchema,
+ adjustedNewSchema));
+ }
+ }
+
+ private Schema buildSchemaWithOptions(Schema schema, Map<String, String>
options) {
+ Schema.Builder builder = Schema.newBuilder();
+ // Copy fields
+ for (org.apache.paimon.types.DataField field : schema.fields()) {
+ builder.column(field.name(), field.type(), field.description());
Review Comment:
The `field.description()` is copied to the new schema builder, but Paimon's
DataField may have nullable information that should also be preserved. Consider
verifying if the field's nullable attribute is correctly maintained when
rebuilding the schema, or document why description-only copying is sufficient
for schema comparison.
```suggestion
builder.column(field.name(), field.type(), field.description(),
field.nullable());
```
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java:
##########
@@ -410,38 +406,31 @@ private void preAlterTableProperties(
+ " in data lake, because the Fluss cluster
doesn't enable datalake tables.");
}
- boolean isLakeTableNewlyCreated = false;
// to enable lake table
if (!isDataLakeEnabled(tableDescriptor)) {
// before create table in fluss, we may create in lake
try {
lakeCatalog.createTable(tablePath, newDescriptor,
lakeCatalogContext);
- // no need to alter lake table if it is newly created
- isLakeTableNewlyCreated = true;
} catch (TableAlreadyExistException e) {
- // TODO: should tolerate if the lake exist but matches our
schema. This ensures
- // eventually consistent by idempotently creating the
table multiple times. See
- // #846
- throw new LakeTableAlreadyExistException(
- String.format(
- "The table %s already exists in %s
catalog, please "
- + "first drop the table in %s
catalog or use a new table name.",
- tablePath, dataLakeFormat,
dataLakeFormat));
+ throw new LakeTableAlreadyExistException(e.getMessage(),
e);
}
}
+ }
- // only need to alter lake table if it is not newly created
- if (!isLakeTableNewlyCreated) {
- {
- try {
- lakeCatalog.alterTable(tablePath, tableChanges,
lakeCatalogContext);
- } catch (TableNotExistException e) {
- throw new FlussRuntimeException(
- "Lake table doesn't exists for lake-enabled
table "
- + tablePath
- + ", which shouldn't be happened.
Please check if the lake table was deleted manually.",
- e);
- }
+ // We should always alter lake table even though datalake is disabled.
+ // Otherwise, if user alter the fluss table when datalake is disabled,
then enable datalake
+ // again, the lake table will mismatch.
+ if (lakeCatalog != null) {
+ try {
+ lakeCatalog.alterTable(tablePath, tableChanges,
lakeCatalogContext);
+ } catch (TableNotExistException e) {
+ // only throw TableNotExistException if datalake is enabled
+ if (isDataLakeEnabled(newDescriptor)) {
+ throw new FlussRuntimeException(
+ "Lake table doesn't exists for lake-enabled table "
Review Comment:
Corrected spelling of 'doesn't exists' to 'doesn't exist'.
```suggestion
"Lake table doesn't exist for lake-enabled table
"
```
##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java:
##########
@@ -610,6 +860,84 @@ void testAlterLakeEnabledTableProperties() throws
Exception {
admin.alterTable(TablePath.of(DATABASE, "not_exist_table"),
tableChanges, true).get();
}
+ @Test
+ void testEnableLakeTableAfterAlterTableProperties() throws Exception {
+ Map<String, String> customProperties = new HashMap<>();
+ customProperties.put("k1", "v1");
+ customProperties.put("paimon.file.format", "parquet");
+
+ // create table
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .build())
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED, false)
+ .customProperties(customProperties)
+ .distributedBy(BUCKET_NUM, "c1", "c2")
+ .build();
+ TablePath tablePath = TablePath.of(DATABASE,
"enable_lake_table_after_alter_properties");
+ admin.createTable(tablePath, tableDescriptor, false).get();
+ // paimon table should not exist because lake table is disable
+ assertThatThrownBy(
+ () ->
+ paimonCatalog.getTable(
+ Identifier.create(DATABASE,
tablePath.getTableName())))
+ .isInstanceOf(Catalog.TableNotExistException.class)
+ .hasMessage(String.format("Table %s does not exist.",
tablePath));
+
+ // alter fluss table properties
+ List<TableChange> tableChanges =
+ Arrays.asList(TableChange.reset("k1"), TableChange.set("k2",
"v2"));
+ admin.alterTable(tablePath, tableChanges, false).get();
+ // enable lake table should be ok
+ TableChange.SetOption enableLake =
+ TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true");
+ admin.alterTable(tablePath, Collections.singletonList(enableLake),
false).get();
+ Table paimonTable =
+ paimonCatalog.getTable(Identifier.create(DATABASE,
tablePath.getTableName()));
+ customProperties.remove("k1");
+ customProperties.put("k2", "v2");
+ Map<String, String> newProperties = new
HashMap<>(tableDescriptor.getProperties());
+ newProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
+ tableDescriptor = tableDescriptor.withProperties(newProperties,
customProperties);
+ verifyPaimonTable(
+ paimonTable,
+ tableDescriptor,
+ RowType.of(
+ new DataType[] {
+ org.apache.paimon.types.DataTypes.INT(),
+ org.apache.paimon.types.DataTypes.STRING(),
+ // for __bucket, __offset, __timestamp
+ org.apache.paimon.types.DataTypes.INT(),
+ org.apache.paimon.types.DataTypes.BIGINT(),
+
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ },
+ new String[] {
+ "c1",
+ "c2",
+ BUCKET_COLUMN_NAME,
+ OFFSET_COLUMN_NAME,
+ TIMESTAMP_COLUMN_NAME
+ }),
+ "c1,c2",
+ BUCKET_NUM);
+
+ // disable lake table
+ TableChange.SetOption disableLake =
+ TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"false");
+ admin.alterTable(tablePath, Collections.singletonList(disableLake),
false).get();
+
+ // alter fluss table properties when lake table is disabled
+ tableChanges = Collections.singletonList(TableChange.set("k2", "v22"));
+ admin.alterTable(tablePath, tableChanges, false).get();
+
+ // enable lake table again should be ok, even though the table
properties are changed
Review Comment:
Corrected grammar in comment: 'even though the table properties are changed'
should be 'even though the table properties have changed'.
```suggestion
// enable lake table again should be ok, even though the table
properties have changed
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]