luoyuxia commented on code in PR #1847:
URL: https://github.com/apache/fluss/pull/1847#discussion_r2518010513


##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java:
##########
@@ -474,16 +732,8 @@ void testAlterLakeEnabledLogTable() throws Exception {
 
         // try to enable lake table again

Review Comment:
   Enhance the test with the most common case:
   1: write some data
   2: disable datalake 
   3: enable datalake again, verify data still exist. we can just verify 
snapshot for clean code.
   
   



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java:
##########
@@ -142,6 +167,67 @@ private void alterTable(Identifier tablePath, 
List<SchemaChange> tableChanges)
         }
     }
 
+    private void validatePaimonSchemaCapability(
+            Identifier tablePath, Schema existingSchema, Schema newSchema) {
+        // Adjust options for comparison
+        Map<String, String> existingOptions = new 
HashMap<>(existingSchema.options());

Review Comment:
   I'm wondering whether the following code is better:
   ```
   private void validatePaimonSchemaCapability(
               Identifier tablePath, Schema existingSchema, Schema newSchema) {
           // Adjust options for comparison
           Map<String, String> existingOptions = existingSchema.options();
           Map<String, String> newOptions =newSchema.options();
           // `path` will be set automatically by Paimon, so we need to remove 
it in existing options
           existingOptions.remove(CoreOptions.PATH.key());
           // when enable datalake with an existing table, 
`table.datalake.enabled` will be `false`
           // in existing options, but `true` in new options.
           String datalakeConfigKey = FLUSS_CONF_PREFIX + 
ConfigOptions.TABLE_DATALAKE_ENABLED.key();
           if 
(Boolean.FALSE.toString().equalsIgnoreCase(existingOptions.get(datalakeConfigKey)))
 {
               existingOptions.remove(datalakeConfigKey);
               newOptions.remove(datalakeConfigKey);
           }
   
           if (!existingSchema.equals(newSchema)) {
               throw new TableAlreadyExistException(
                       String.format(
                               "The table %s already exists in Paimon catalog, 
but the table schema is not compatible. "
                                       + "Existing schema: %s, new schema: %s. "
                                       + "Please first drop the table in Paimon 
catalog or use a new table name.",
                               tablePath.getEscapedFullName(),
                               existingSchema,
                               newSchema));
           }
       }
   ```
   which modify schema's option directly.
   
   I'm afraid that paimon schema introduce another field and we forget set the 
field in method `buildSchemaWithOptions`



##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java:
##########
@@ -401,6 +408,257 @@ void 
testCreateLakeEnableTableWithUnsettablePaimonOptions() {
         }
     }
 
+    @Test
+    void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("k1", "v1");
+        customProperties.put("paimon.file.format", "parquet");
+
+        // 1. test for existing lake table without bucket keys
+        TableDescriptor logTableWithoutBucketKeys =
+                createTableDescriptor(
+                        2,
+                        BUCKET_NUM,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        customProperties,
+                        false);
+        TablePath logTablePathWithoutBucketkeys =
+                TablePath.of(DATABASE, "log_table_without_bucket_keys");
+        admin.createTable(logTablePathWithoutBucketkeys, 
logTableWithoutBucketKeys, false).get();
+        // drop fluss table, lake table should still exist
+        admin.dropTable(logTablePathWithoutBucketkeys, false).get();
+        // create the same fluss table again should be ok
+        admin.createTable(logTablePathWithoutBucketkeys, 
logTableWithoutBucketKeys, false).get();
+        admin.dropTable(logTablePathWithoutBucketkeys, false).get();
+
+        // paimon table use dynamic bucket for fluss log table without bucket 
keys
+        // so it should be ok to create the same fluss table with a new bucket 
num
+        logTableWithoutBucketKeys = 
logTableWithoutBucketKeys.withBucketCount(BUCKET_NUM + 1);
+        admin.createTable(logTablePathWithoutBucketkeys, 
logTableWithoutBucketKeys, false).get();
+        admin.dropTable(logTablePathWithoutBucketkeys, false).get();
+
+        // create log table with bucket keys will throw exception
+        TableDescriptor logTableWithoutBucketKeys1 =
+                createTableDescriptor(
+                        2,
+                        BUCKET_NUM,
+                        Arrays.asList("c1", "c2"),
+                        Collections.emptyList(),
+                        customProperties,
+                        false);
+        assertThatThrownBy(
+                        () ->
+                                admin.createTable(
+                                                logTablePathWithoutBucketkeys,
+                                                logTableWithoutBucketKeys1,
+                                                false)
+                                        .get())
+                .cause()
+                .isInstanceOf(LakeTableAlreadyExistException.class)
+                .hasMessage(
+                        "The table `fluss`.`log_table_without_bucket_keys` 
already exists in Paimon catalog, but the table schema is not compatible. "
+                                + "Existing schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) 
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, 
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, 
fluss.table.datalake.format=paimon, partition.legacy-name=false, 
file.format=parquet, fluss.k1=v1}, comment=null}, "
+                                + "new schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) 
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3, 
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, 
fluss.table.datalake.format=paimon, partition.legacy-name=false, 
bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
+                                + "Please first drop the table in Paimon 
catalog or use a new table name.");
+
+        // create log table with different fields will throw exception
+        TableDescriptor logTableWithoutBucketKeys2 =
+                createTableDescriptor(
+                        3,
+                        BUCKET_NUM,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        customProperties,
+                        false);
+        assertThatThrownBy(
+                        () ->
+                                admin.createTable(
+                                                logTablePathWithoutBucketkeys,
+                                                logTableWithoutBucketKeys2,
+                                                false)
+                                        .get())
+                .cause()
+                .isInstanceOf(LakeTableAlreadyExistException.class)
+                .hasMessage(
+                        "The table `fluss`.`log_table_without_bucket_keys` 
already exists in Paimon catalog, but the table schema is not compatible. "
+                                + "Existing schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) 
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, 
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, 
fluss.table.datalake.format=paimon, partition.legacy-name=false, 
file.format=parquet, fluss.k1=v1}, comment=null}, "
+                                + "new schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` 
TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], 
options={bucket=-1, fluss.table.replication.factor=1, 
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, 
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
+                                + "Please first drop the table in Paimon 
catalog or use a new table name.");
+
+        // create table with primary keys will throw exception

Review Comment:
   I think verify the basic cases should be fine. Not need to verify all case 
in here since the code path should be same.
   ```
   void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
           Map<String, String> customProperties = new HashMap<>();
           customProperties.put("k1", "v1");
           customProperties.put("paimon.file.format", "parquet");
   
           // 1. test for existing lake table without bucket keys
           TableDescriptor logTableWithoutBucketKeys =
                   createTableDescriptor(
                           2,
                           BUCKET_NUM,
                           Collections.emptyList(),
                           Collections.emptyList(),
                           customProperties,
                           false);
           TablePath logTablePathWithoutBucketkeys =
                   TablePath.of(DATABASE, "log_table_without_bucket_keys");
           admin.createTable(logTablePathWithoutBucketkeys, 
logTableWithoutBucketKeys, false).get();
           // drop fluss table, lake table should still exist
           admin.dropTable(logTablePathWithoutBucketkeys, false).get();
           // create the same fluss table again should be ok
           admin.createTable(logTablePathWithoutBucketkeys, 
logTableWithoutBucketKeys, false).get();
           admin.dropTable(logTablePathWithoutBucketkeys, false).get();
   
           // paimon table use dynamic bucket for fluss log table without 
bucket keys
           // so it should be ok to create the same fluss table with a new 
bucket num
           logTableWithoutBucketKeys = 
logTableWithoutBucketKeys.withBucketCount(BUCKET_NUM + 1);
           admin.createTable(logTablePathWithoutBucketkeys, 
logTableWithoutBucketKeys, false).get();
           admin.dropTable(logTablePathWithoutBucketkeys, false).get();
   
           // create log table with bucket keys will throw exception
           TableDescriptor logTableWithoutBucketKeys1 =
                   createTableDescriptor(
                           2,
                           BUCKET_NUM,
                           Arrays.asList("c1", "c2"),
                           Collections.emptyList(),
                           customProperties,
                           false);
           assertThatThrownBy(
                           () ->
                                   admin.createTable(
                                                   
logTablePathWithoutBucketkeys,
                                                   logTableWithoutBucketKeys1,
                                                   false)
                                           .get())
                   .cause()
                   .isInstanceOf(LakeTableAlreadyExistException.class)
                   .hasMessage(
                           "The table `fluss`.`log_table_without_bucket_keys` 
already exists in Paimon catalog, but the table schema is not compatible. "
                                   + "Existing schema: 
UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, 
`__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], 
primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, 
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, 
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
                                   + "new schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) 
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3, 
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, 
fluss.table.datalake.format=paimon, partition.legacy-name=false, 
bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
                                   + "Please first drop the table in Paimon 
catalog or use a new table name.");
   
           // create log table with different fields will throw exception
           TableDescriptor logTableWithoutBucketKeys2 =
                   createTableDescriptor(
                           3,
                           BUCKET_NUM,
                           Collections.emptyList(),
                           Collections.emptyList(),
                           customProperties,
                           false);
           assertThatThrownBy(
                           () ->
                                   admin.createTable(
                                                   
logTablePathWithoutBucketkeys,
                                                   logTableWithoutBucketKeys2,
                                                   false)
                                           .get())
                   .cause()
                   .isInstanceOf(LakeTableAlreadyExistException.class)
                   .hasMessage(
                           "The table `fluss`.`log_table_without_bucket_keys` 
already exists in Paimon catalog, but the table schema is not compatible. "
                                   + "Existing schema: 
UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, 
`__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], 
primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, 
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, 
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
                                   + "new schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` 
TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], 
options={bucket=-1, fluss.table.replication.factor=1, 
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, 
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
                                   + "Please first drop the table in Paimon 
catalog or use a new table name.");
   
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to