luoyuxia commented on code in PR #1995:
URL: https://github.com/apache/fluss/pull/1995#discussion_r2538155065


##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java:
##########
@@ -476,6 +477,54 @@ void testCreateLakeEnableTableWithExistLakeTable() throws 
Exception {
                                 + "Existing schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) 
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, 
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, 
fluss.table.datalake.format=paimon, partition.legacy-name=false, 
file.format=parquet, fluss.k1=v1}, comment=null}, "
                                 + "new schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` 
TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], 
options={bucket=-1, fluss.table.replication.factor=1, 
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, 
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
                                 + "Please first drop the table in Paimon 
catalog or use a new table name.");
+
+        // add an insignificant option to Paimon table will be ok
+        Identifier paimonTablePath =
+                Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName());
+        SchemaChange schemaChange1 = SchemaChange.setOption("any.k1", 
"any.v1");
+        paimonCatalog.alterTable(paimonTablePath, 
Collections.singletonList(schemaChange1), false);
+        admin.createTable(tablePath, td, false).get();
+        admin.dropTable(tablePath, false).get();
+
+        // alter a Fluss option to Paimon table will throw exception
+        SchemaChange schemaChange2 = SchemaChange.setOption("fluss.k1", "v2");
+        paimonCatalog.alterTable(paimonTablePath, 
Collections.singletonList(schemaChange2), false);
+        TableDescriptor finalTd = td;
+        assertThatThrownBy(() -> admin.createTable(tablePath, finalTd, 
false).get())
+                .cause()
+                .isInstanceOf(LakeTableAlreadyExistException.class)
+                .hasMessageContaining(
+                        "The table `fluss`.`log_table_with_exist_lake_table` 
already exists in Paimon catalog, "
+                                + "but the table schema is not compatible.");
+
+        // reset fluss.k1 in Paimon
+        SchemaChange schemaChange3 = SchemaChange.setOption("fluss.k1", "v1");
+        paimonCatalog.alterTable(paimonTablePath, 
Collections.singletonList(schemaChange3), false);
+
+        // add a new Paimon option (not specified in the Fluss table) to 
Paimon table will be ok

Review Comment:
   shouldn't modify a  Paimon option also throw exception? Imagine you modify 
the partition bucket key.



##########
fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java:
##########
@@ -282,6 +282,12 @@ public CompletableFuture<LimitScanResponse> 
limitScan(LimitScanRequest request)
     @Override
     public CompletableFuture<NotifyLeaderAndIsrResponse> notifyLeaderAndIsr(
             NotifyLeaderAndIsrRequest notifyLeaderAndIsrRequest) {
+        // This method should only be called by Coordinator internally
+        if (!currentSession().isInternal()) {

Review Comment:
   why change this?



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.paimon.utils;
+
+import org.apache.fluss.exception.TableAlreadyExistException;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
+
+/** Utils to verify whether the existing Paimon table is compatible with the 
table to be created. */
+public class PaimonTableValidation {
+
+    private static final Map<String, ConfigOption<?>> PAIMON_CONFIGS = 
extractPaimonConfigs();
+
+    public static void validatePaimonSchemaCapability(
+            Identifier tablePath, Schema existingSchema, Schema newSchema) {
+        // Adjust options for comparison
+        Map<String, String> existingOptions = existingSchema.options();
+        Map<String, String> newOptions = newSchema.options();
+
+        // when enable datalake with an existing table, 
`table.datalake.enabled` will be `false`
+        // in existing options, but `true` in new options.
+        String datalakeConfigKey =
+                FLUSS_CONF_PREFIX
+                        + 
org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED.key();
+        if 
(Boolean.FALSE.toString().equalsIgnoreCase(existingOptions.get(datalakeConfigKey)))
 {
+            existingOptions.remove(datalakeConfigKey);
+            newOptions.remove(datalakeConfigKey);
+        }
+
+        // remove changeable options
+        removeChangeableOptions(existingOptions);
+        removeChangeableOptions(newOptions);
+
+        // ignore the existing options that are not in new options
+        existingOptions.entrySet().removeIf(entry -> 
!newOptions.containsKey(entry.getKey()));

Review Comment:
   why need this line code? Could you please give me an example?



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonTableValidation.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.paimon.utils;
+
+import org.apache.fluss.exception.TableAlreadyExistException;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
+
+/** Utils to verify whether the existing Paimon table is compatible with the 
table to be created. */
+public class PaimonTableValidation {
+
+    private static final Map<String, ConfigOption<?>> PAIMON_CONFIGS = 
extractPaimonConfigs();
+
+    public static void validatePaimonSchemaCapability(

Review Comment:
   ```suggestion
       public static void validatePaimonSchemaCompatible(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to