LiebingYu commented on code in PR #1995:
URL: https://github.com/apache/fluss/pull/1995#discussion_r2538196297


##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java:
##########
@@ -476,6 +477,54 @@ void testCreateLakeEnableTableWithExistLakeTable() throws 
Exception {
                                 + "Existing schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) 
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, 
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, 
fluss.table.datalake.format=paimon, partition.legacy-name=false, 
file.format=parquet, fluss.k1=v1}, comment=null}, "
                                 + "new schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` 
TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], 
options={bucket=-1, fluss.table.replication.factor=1, 
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, 
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
                                 + "Please first drop the table in Paimon 
catalog or use a new table name.");
+
+        // add an insignificant option to Paimon table will be ok
+        Identifier paimonTablePath =
+                Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName());
+        SchemaChange schemaChange1 = SchemaChange.setOption("any.k1", 
"any.v1");
+        paimonCatalog.alterTable(paimonTablePath, 
Collections.singletonList(schemaChange1), false);
+        admin.createTable(tablePath, td, false).get();
+        admin.dropTable(tablePath, false).get();
+
+        // alter a Fluss option to Paimon table will throw exception
+        SchemaChange schemaChange2 = SchemaChange.setOption("fluss.k1", "v2");
+        paimonCatalog.alterTable(paimonTablePath, 
Collections.singletonList(schemaChange2), false);
+        TableDescriptor finalTd = td;
+        assertThatThrownBy(() -> admin.createTable(tablePath, finalTd, 
false).get())
+                .cause()
+                .isInstanceOf(LakeTableAlreadyExistException.class)
+                .hasMessageContaining(
+                        "The table `fluss`.`log_table_with_exist_lake_table` 
already exists in Paimon catalog, "
+                                + "but the table schema is not compatible.");
+
+        // reset fluss.k1 in Paimon
+        SchemaChange schemaChange3 = SchemaChange.setOption("fluss.k1", "v1");
+        paimonCatalog.alterTable(paimonTablePath, 
Collections.singletonList(schemaChange3), false);
+
+        // add a new Paimon option (not specified in the Fluss table) to 
Paimon table will be ok

Review Comment:
   These lines only change the `fluss.k1` back to `v1`. Otherwise the following 
code cannot work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to