This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 32003fcaf [lake] Tolerate Paimon lake table existent if the schema and 
properties matches (#1847)
32003fcaf is described below

commit 32003fcaf420b08709691c3a996291357f002795
Author: Liebing <[email protected]>
AuthorDate: Wed Nov 12 21:39:54 2025 +0800

    [lake] Tolerate Paimon lake table existent if the schema and properties 
matches (#1847)
---
 .../apache/fluss/lake/lakestorage/LakeCatalog.java |   7 +
 .../lakestorage/TestingLakeCatalogContext.java     |   5 +
 .../fluss/flink/catalog/FlinkCatalogTest.java      |  10 +-
 .../fluss/lake/paimon/PaimonLakeCatalog.java       |  67 ++++-
 .../{ => utils}/FlussDataTypeToPaimonDataType.java |   2 +-
 .../fluss/lake/paimon/utils/PaimonConversions.java |   4 +-
 .../lake/paimon/LakeEnabledTableCreateITCase.java  | 275 +++++++++++++++++++--
 .../fluss/lake/paimon/PaimonLakeCatalogTest.java   |  12 +-
 .../server/coordinator/CoordinatorService.java     |  26 +-
 .../fluss/server/coordinator/MetadataManager.java  |  43 ++--
 .../server/coordinator/LakeTableManagerITCase.java |  10 +-
 .../lakehouse/TestingPaimonStoragePlugin.java      |   5 +-
 fluss-test-coverage/pom.xml                        |   2 +-
 13 files changed, 386 insertions(+), 82 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java 
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
index 4048ca007..4cbccb6c1 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
@@ -75,6 +75,13 @@ public interface LakeCatalog extends AutoCloseable {
     @PublicEvolving
     interface Context {
 
+        /**
+         * Whether the current operation is creating a fluss table.
+         *
+         * @return true if the current operation is creating a fluss table
+         */
+        boolean isCreatingFlussTable();
+
         /** Get the fluss principal currently accessing the catalog. */
         FlussPrincipal getFlussPrincipal();
     }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
 
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
index b57ff94cf..7406b13b6 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
@@ -22,6 +22,11 @@ import org.apache.fluss.security.acl.FlussPrincipal;
 /** A testing implementation of {@link LakeCatalog.Context}. */
 public class TestingLakeCatalogContext implements LakeCatalog.Context {
 
+    @Override
+    public boolean isCreatingFlussTable() {
+        return false;
+    }
+
     @Override
     public FlussPrincipal getFlussPrincipal() {
         return null;
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index 84a85dead..abaf8e7e0 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -308,13 +308,9 @@ class FlinkCatalogTest {
         assertThat(catalog.tableExists(lakeTablePath)).isTrue();
         // drop fluss table
         catalog.dropTable(lakeTablePath, false);
-        // create the table again, should throw exception with ignore if exist 
= false
-        assertThatThrownBy(() -> catalog.createTable(lakeTablePath, table, 
false))
-                .isInstanceOf(CatalogException.class)
-                .hasMessage(
-                        String.format(
-                                "The table %s already exists in %s catalog, 
please first drop the table in %s catalog or use a new table name.",
-                                lakeTablePath, "paimon", "paimon"));
+        assertThat(catalog.tableExists(lakeTablePath)).isFalse();
+        // create the table again should be ok, because the existing lake 
table is matched
+        catalog.createTable(lakeTablePath, table, false);
     }
 
     @Test
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index 22a189208..712c2af38 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -18,6 +18,7 @@
 package org.apache.fluss.lake.paimon;
 
 import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.TableAlreadyExistException;
 import org.apache.fluss.exception.TableNotExistException;
@@ -27,6 +28,7 @@ import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.utils.IOUtils;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
@@ -34,12 +36,16 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
+import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.FLUSS_CONF_PREFIX;
 import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
 import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchema;
 import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonSchemaChanges;
@@ -83,12 +89,12 @@ public class PaimonLakeCatalog implements LakeCatalog {
         Identifier paimonPath = toPaimon(tablePath);
         Schema paimonSchema = toPaimonSchema(tableDescriptor);
         try {
-            createTable(paimonPath, paimonSchema);
+            createTable(paimonPath, paimonSchema, 
context.isCreatingFlussTable());
         } catch (Catalog.DatabaseNotExistException e) {
             // create database
             createDatabase(tablePath.getDatabaseName());
             try {
-                createTable(paimonPath, paimonSchema);
+                createTable(paimonPath, paimonSchema, 
context.isCreatingFlussTable());
             } catch (Catalog.DatabaseNotExistException t) {
                 // shouldn't happen in normal cases
                 throw new RuntimeException(
@@ -114,13 +120,31 @@ public class PaimonLakeCatalog implements LakeCatalog {
         }
     }
 
-    private void createTable(Identifier tablePath, Schema schema)
+    private void createTable(Identifier tablePath, Schema schema, boolean 
isCreatingFlussTable)
             throws Catalog.DatabaseNotExistException {
         try {
             // not ignore if table exists
             paimonCatalog.createTable(tablePath, schema, false);
         } catch (Catalog.TableAlreadyExistException e) {
-            throw new TableAlreadyExistException("Table " + tablePath + " 
already exists.");
+            try {
+                Table table = paimonCatalog.getTable(tablePath);
+                FileStoreTable fileStoreTable = (FileStoreTable) table;
+                validatePaimonSchemaCapability(
+                        tablePath, fileStoreTable.schema().toSchema(), schema);
+                // if creating a new fluss table, we should ensure the lake 
table is empty
+                if (isCreatingFlussTable) {
+                    checkTableIsEmpty(tablePath, fileStoreTable);
+                }
+            } catch (Catalog.TableNotExistException tableNotExistException) {
+                // shouldn't happen in normal cases
+                throw new RuntimeException(
+                        String.format(
+                                "Failed to create table %s in Paimon. The 
table already existed "
+                                        + "during the initial creation 
attempt, but subsequently "
+                                        + "could not be found when trying to 
get it. "
+                                        + "Please check whether the Paimon 
table was manually deleted, and try again.",
+                                tablePath));
+            }
         }
     }
 
@@ -142,6 +166,41 @@ public class PaimonLakeCatalog implements LakeCatalog {
         }
     }
 
+    private void validatePaimonSchemaCapability(
+            Identifier tablePath, Schema existingSchema, Schema newSchema) {
+        // Adjust options for comparison
+        Map<String, String> existingOptions = existingSchema.options();
+        Map<String, String> newOptions = newSchema.options();
+        // `path` will be set automatically by Paimon, so we need to remove it 
in existing options
+        existingOptions.remove(CoreOptions.PATH.key());
+        // when enable datalake with an existing table, 
`table.datalake.enabled` will be `false`
+        // in existing options, but `true` in new options.
+        String datalakeConfigKey = FLUSS_CONF_PREFIX + 
ConfigOptions.TABLE_DATALAKE_ENABLED.key();
+        if 
(Boolean.FALSE.toString().equalsIgnoreCase(existingOptions.get(datalakeConfigKey)))
 {
+            existingOptions.remove(datalakeConfigKey);
+            newOptions.remove(datalakeConfigKey);
+        }
+
+        if (!existingSchema.equals(newSchema)) {
+            throw new TableAlreadyExistException(
+                    String.format(
+                            "The table %s already exists in Paimon catalog, 
but the table schema is not compatible. "
+                                    + "Existing schema: %s, new schema: %s. "
+                                    + "Please first drop the table in Paimon 
catalog or use a new table name.",
+                            tablePath.getEscapedFullName(), existingSchema, 
newSchema));
+        }
+    }
+
+    private void checkTableIsEmpty(Identifier tablePath, FileStoreTable table) 
{
+        if (table.latestSnapshot().isPresent()) {
+            throw new TableAlreadyExistException(
+                    String.format(
+                            "The table %s already exists in Paimon catalog, 
and the table is not empty. "
+                                    + "Please first drop the table in Paimon 
catalog or use a new table name.",
+                            tablePath.getEscapedFullName()));
+        }
+    }
+
     @Override
     public void close() {
         IOUtils.closeQuietly(paimonCatalog, "paimon catalog");
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/FlussDataTypeToPaimonDataType.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/FlussDataTypeToPaimonDataType.java
similarity index 99%
rename from 
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/FlussDataTypeToPaimonDataType.java
rename to 
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/FlussDataTypeToPaimonDataType.java
index 204b35119..e97b4f67a 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/FlussDataTypeToPaimonDataType.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/FlussDataTypeToPaimonDataType.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.lake.paimon;
+package org.apache.fluss.lake.paimon.utils;
 
 import org.apache.fluss.types.ArrayType;
 import org.apache.fluss.types.BigIntType;
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
index 1f57ba3b6..03f9ae753 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
@@ -20,7 +20,6 @@ package org.apache.fluss.lake.paimon.utils;
 import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.exception.InvalidConfigException;
 import org.apache.fluss.exception.InvalidTableException;
-import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType;
 import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
 import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
@@ -57,7 +56,7 @@ public class PaimonConversions {
     private static final String PARTITION_GENERATE_LEGACY_NAME_OPTION_KEY = 
"partition.legacy-name";
 
     // for fluss config
-    private static final String FLUSS_CONF_PREFIX = "fluss.";
+    public static final String FLUSS_CONF_PREFIX = "fluss.";
     // for paimon config
     private static final String PAIMON_CONF_PREFIX = "paimon.";
 
@@ -67,6 +66,7 @@ public class PaimonConversions {
     static {
         PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key());
         PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key());
+        PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.PATH.key());
         
PAIMON_UNSETTABLE_OPTIONS.add(PARTITION_GENERATE_LEGACY_NAME_OPTION_KEY);
     }
 
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 37033ea74..3e47bdc46 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -34,12 +34,20 @@ import 
org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.types.DataTypes;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
 import org.junit.jupiter.api.AfterEach;
@@ -55,6 +63,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Stream;
 
 import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.PAIMON_UNSETTABLE_OPTIONS;
@@ -401,6 +410,110 @@ class LakeEnabledTableCreateITCase {
         }
     }
 
+    @Test
+    void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("k1", "v1");
+        customProperties.put("paimon.file.format", "parquet");
+
+        // test for existing lake table
+        TableDescriptor td =
+                createTableDescriptor(
+                        2,
+                        BUCKET_NUM,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        customProperties,
+                        false);
+        TablePath tablePath = TablePath.of(DATABASE, 
"log_table_with_exist_lake_table");
+        admin.createTable(tablePath, td, false).get();
+        // drop fluss table, lake table should still exist
+        admin.dropTable(tablePath, false).get();
+        // create the same fluss table again should be ok
+        admin.createTable(tablePath, td, false).get();
+        admin.dropTable(tablePath, false).get();
+
+        // paimon table use dynamic bucket for fluss log table without bucket 
keys
+        // so it should be ok to create the same fluss table with a new bucket 
num
+        td = td.withBucketCount(BUCKET_NUM + 1);
+        admin.createTable(tablePath, td, false).get();
+        admin.dropTable(tablePath, false).get();
+
+        // create log table with bucket keys will throw exception
+        TableDescriptor logTableWithoutBucketKeys1 =
+                createTableDescriptor(
+                        2,
+                        BUCKET_NUM,
+                        Arrays.asList("c1", "c2"),
+                        Collections.emptyList(),
+                        customProperties,
+                        false);
+        assertThatThrownBy(
+                        () -> admin.createTable(tablePath, 
logTableWithoutBucketKeys1, false).get())
+                .cause()
+                .isInstanceOf(LakeTableAlreadyExistException.class)
+                .hasMessage(
+                        "The table `fluss`.`log_table_with_exist_lake_table` 
already exists in Paimon catalog, but the table schema is not compatible. "
+                                + "Existing schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) 
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, 
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, 
fluss.table.datalake.format=paimon, partition.legacy-name=false, 
file.format=parquet, fluss.k1=v1}, comment=null}, "
+                                + "new schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) 
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3, 
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, 
fluss.table.datalake.format=paimon, partition.legacy-name=false, 
bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
+                                + "Please first drop the table in Paimon 
catalog or use a new table name.");
+
+        // create log table with different fields will throw exception
+        TableDescriptor logTableWithoutBucketKeys2 =
+                createTableDescriptor(
+                        3,
+                        BUCKET_NUM,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        customProperties,
+                        false);
+        assertThatThrownBy(
+                        () -> admin.createTable(tablePath, 
logTableWithoutBucketKeys2, false).get())
+                .cause()
+                .isInstanceOf(LakeTableAlreadyExistException.class)
+                .hasMessage(
+                        "The table `fluss`.`log_table_with_exist_lake_table` 
already exists in Paimon catalog, but the table schema is not compatible. "
+                                + "Existing schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) 
WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, 
fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, 
fluss.table.datalake.format=paimon, partition.legacy-name=false, 
file.format=parquet, fluss.k1=v1}, comment=null}, "
+                                + "new schema: UpdateSchema{fields=[`c1` 
STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` 
TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], 
options={bucket=-1, fluss.table.replication.factor=1, 
fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, 
partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
+                                + "Please first drop the table in Paimon 
catalog or use a new table name.");
+    }
+
+    @Test
+    void testCreateLakeEnableTableWithExistNonEmptyLakeTable() throws 
Exception {
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("k1", "v1");
+        customProperties.put("paimon.file.format", "parquet");
+
+        // test for existing lake table with some data
+        TableDescriptor td =
+                TableDescriptor.builder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("c1", DataTypes.INT())
+                                        .column("c2", DataTypes.STRING())
+                                        .build())
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .customProperties(customProperties)
+                        .build();
+        TablePath tablePath = TablePath.of(DATABASE, 
"log_table_with_non_empty_lake_table");
+        admin.createTable(tablePath, td, false).get();
+        // drop fluss table, lake table should still exist
+        admin.dropTable(tablePath, false).get();
+        // create the same fluss table again should be ok
+        admin.createTable(tablePath, td, false).get();
+        admin.dropTable(tablePath, false).get();
+
+        // write some data to the lake table
+        writeData(
+                paimonCatalog.getTable(
+                        Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName())));
+        assertThatThrownBy(() -> admin.createTable(tablePath, td, false).get())
+                .cause()
+                .isInstanceOf(LakeTableAlreadyExistException.class)
+                .hasMessage(
+                        "The table 
`fluss`.`log_table_with_non_empty_lake_table` already exists in Paimon catalog, 
and the table is not empty. Please first drop the table in Paimon catalog or 
use a new table name.");
+    }
+
     @Test
     void testAlterLakeEnabledLogTable() throws Exception {
         Map<String, String> customProperties = new HashMap<>();
@@ -435,8 +548,8 @@ class LakeEnabledTableCreateITCase {
 
         admin.alterTable(logTablePath, changes, false).get();
 
-        Table enabledPaimonLogTable =
-                paimonCatalog.getTable(Identifier.create(DATABASE, 
logTablePath.getTableName()));
+        Identifier paimonTablePath = Identifier.create(DATABASE, 
logTablePath.getTableName());
+        Table enabledPaimonLogTable = paimonCatalog.getTable(paimonTablePath);
 
         Map<String, String> updatedProperties = new HashMap<>();
         updatedProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true");
@@ -470,20 +583,26 @@ class LakeEnabledTableCreateITCase {
         changes = Collections.singletonList(disableLake);
         admin.alterTable(logTablePath, changes, false).get();
         // paimon table should still exist although lake is disabled
-        paimonCatalog.getTable(Identifier.create(DATABASE, 
logTablePath.getTableName()));
+        paimonCatalog.getTable(paimonTablePath);
 
         // try to enable lake table again
         enableLake = 
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
-        List<TableChange> finalChanges = Collections.singletonList(enableLake);
-        // TODO: After #846 is implemented, we should remove this exception 
assertion.
-        assertThatThrownBy(() -> admin.alterTable(logTablePath, finalChanges, 
false).get())
-                .cause()
-                .isInstanceOf(LakeTableAlreadyExistException.class)
-                .hasMessage(
-                        String.format(
-                                "The table %s already exists in paimon 
catalog, please "
-                                        + "first drop the table in paimon 
catalog or use a new table name.",
-                                logTablePath));
+        changes = Collections.singletonList(enableLake);
+        admin.alterTable(logTablePath, changes, false).get();
+
+        // write some data to the lake table
+        writeData(paimonCatalog.getTable(paimonTablePath));
+        Optional<Snapshot> snapshot = 
paimonCatalog.getTable(paimonTablePath).latestSnapshot();
+        assertThat(snapshot).isNotEmpty();
+
+        // disable lake table again
+        changes = Collections.singletonList(disableLake);
+        admin.alterTable(logTablePath, changes, false).get();
+
+        // try to enable lake table again, the snapshot should not change
+        changes = Collections.singletonList(enableLake);
+        admin.alterTable(logTablePath, changes, false).get();
+        
assertThat(paimonCatalog.getTable(paimonTablePath).latestSnapshot()).isEqualTo(snapshot);
     }
 
     @Test
@@ -602,7 +721,7 @@ class LakeEnabledTableCreateITCase {
                 .cause()
                 .isInstanceOf(FlussRuntimeException.class)
                 .hasMessageContaining(
-                        "Lake table doesn't exists for lake-enabled table "
+                        "Lake table doesn't exist for lake-enabled table "
                                 + tablePath
                                 + ", which shouldn't be happened. Please check 
if the lake table was deleted manually.");
 
@@ -610,6 +729,84 @@ class LakeEnabledTableCreateITCase {
         admin.alterTable(TablePath.of(DATABASE, "not_exist_table"), 
tableChanges, true).get();
     }
 
+    @Test
+    void testEnableLakeTableAfterAlterTableProperties() throws Exception {
+        Map<String, String> customProperties = new HashMap<>();
+        customProperties.put("k1", "v1");
+        customProperties.put("paimon.file.format", "parquet");
+
+        // create table
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("c1", DataTypes.INT())
+                                        .column("c2", DataTypes.STRING())
+                                        .build())
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, false)
+                        .customProperties(customProperties)
+                        .distributedBy(BUCKET_NUM, "c1", "c2")
+                        .build();
+        TablePath tablePath = TablePath.of(DATABASE, 
"enable_lake_table_after_alter_properties");
+        admin.createTable(tablePath, tableDescriptor, false).get();
+        // paimon table should not exist because lake table is disable
+        assertThatThrownBy(
+                        () ->
+                                paimonCatalog.getTable(
+                                        Identifier.create(DATABASE, 
tablePath.getTableName())))
+                .isInstanceOf(Catalog.TableNotExistException.class)
+                .hasMessage(String.format("Table %s does not exist.", 
tablePath));
+
+        // alter fluss table properties
+        List<TableChange> tableChanges =
+                Arrays.asList(TableChange.reset("k1"), TableChange.set("k2", 
"v2"));
+        admin.alterTable(tablePath, tableChanges, false).get();
+        // enable lake table should be ok
+        TableChange.SetOption enableLake =
+                TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true");
+        admin.alterTable(tablePath, Collections.singletonList(enableLake), 
false).get();
+        Table paimonTable =
+                paimonCatalog.getTable(Identifier.create(DATABASE, 
tablePath.getTableName()));
+        customProperties.remove("k1");
+        customProperties.put("k2", "v2");
+        Map<String, String> newProperties = new 
HashMap<>(tableDescriptor.getProperties());
+        newProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
+        tableDescriptor = tableDescriptor.withProperties(newProperties, 
customProperties);
+        verifyPaimonTable(
+                paimonTable,
+                tableDescriptor,
+                RowType.of(
+                        new DataType[] {
+                            org.apache.paimon.types.DataTypes.INT(),
+                            org.apache.paimon.types.DataTypes.STRING(),
+                            // for __bucket, __offset, __timestamp
+                            org.apache.paimon.types.DataTypes.INT(),
+                            org.apache.paimon.types.DataTypes.BIGINT(),
+                            
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+                        },
+                        new String[] {
+                            "c1",
+                            "c2",
+                            BUCKET_COLUMN_NAME,
+                            OFFSET_COLUMN_NAME,
+                            TIMESTAMP_COLUMN_NAME
+                        }),
+                "c1,c2",
+                BUCKET_NUM);
+
+        // disable lake table
+        TableChange.SetOption disableLake =
+                TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"false");
+        admin.alterTable(tablePath, Collections.singletonList(disableLake), 
false).get();
+
+        // alter fluss table properties when lake table is disabled
+        tableChanges = Collections.singletonList(TableChange.set("k2", "v22"));
+        admin.alterTable(tablePath, tableChanges, false).get();
+
+        // enable lake table again should be ok, even though the table 
properties have changed
+        admin.alterTable(tablePath, Collections.singletonList(enableLake), 
false).get();
+    }
+
     private void verifyPaimonTable(
             Table paimonTable,
             TableDescriptor flussTable,
@@ -657,4 +854,54 @@ class LakeEnabledTableCreateITCase {
         RowType paimonRowType = paimonTable.rowType();
         assertThat(paimonRowType).isEqualTo(expectedRowType);
     }
+
+    private TableDescriptor createTableDescriptor(
+            int columnNum,
+            int bucketNum,
+            List<String> bucketKeys,
+            List<String> partitionKeys,
+            Map<String, String> customProperties,
+            boolean withPrimaryKeys) {
+        Schema.Builder builder = Schema.newBuilder();
+        for (int i = 1; i <= columnNum; i++) {
+            if (i % 2 == 0) {
+                builder.column("c" + i, DataTypes.INT());
+            } else {
+                builder.column("c" + i, DataTypes.STRING());
+            }
+        }
+        if (withPrimaryKeys) {
+            builder.primaryKey("c1");
+        }
+
+        return TableDescriptor.builder()
+                .schema(builder.build())
+                .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                .customProperties(customProperties)
+                .distributedBy(bucketNum, bucketKeys)
+                .partitionedBy(partitionKeys)
+                .build();
+    }
+
+    private void writeData(Table table) throws Exception {
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+
+            for (int i = 0; i < 10; i++) {
+                GenericRow row =
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("row-" + i),
+                                0,
+                                (long) i,
+                                
Timestamp.fromEpochMillis(System.currentTimeMillis()));
+                write.write(row);
+            }
+
+            List<CommitMessage> messages = write.prepareCommit();
+            commit.commit(messages);
+        }
+        assertThat(table.latestSnapshot()).isNotEmpty();
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
index a9959d821..02b5f1da2 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
@@ -32,7 +32,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
-import java.util.Arrays;
+import java.util.Collections;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -51,9 +51,9 @@ class PaimonLakeCatalogTest {
     }
 
     @Test
-    void testAlterTableConfigs() throws Exception {
-        String database = "test_alter_table_configs_db";
-        String tableName = "test_alter_table_configs_table";
+    void testAlterTableProperties() throws Exception {
+        String database = "test_alter_table_properties_db";
+        String tableName = "test_alter_table_properties_table";
         TablePath tablePath = TablePath.of(database, tableName);
         Identifier identifier = Identifier.create(database, tableName);
         createTable(database, tableName);
@@ -65,7 +65,7 @@ class PaimonLakeCatalogTest {
         // set the value for key
         flussPaimonCatalog.alterTable(
                 tablePath,
-                Arrays.asList(TableChange.set("key", "value")),
+                Collections.singletonList(TableChange.set("key", "value")),
                 new TestingLakeCatalogContext());
 
         table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
@@ -75,7 +75,7 @@ class PaimonLakeCatalogTest {
         // reset the value for key
         flussPaimonCatalog.alterTable(
                 tablePath,
-                Arrays.asList(TableChange.reset("key")),
+                Collections.singletonList(TableChange.reset("key")),
                 new TestingLakeCatalogContext());
 
         table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index d45cc0231..54480d663 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -283,8 +283,6 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
             tableAssignment = generateAssignment(bucketCount, replicaFactor, 
servers);
         }
 
-        // TODO: should tolerate if the lake exist but matches our schema. 
This ensures eventually
-        //  consistent by idempotently creating the table multiple times. See 
#846
         // before create table in fluss, we may create in lake
         if (isDataLakeEnabled(tableDescriptor)) {
             try {
@@ -292,15 +290,10 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                         .createTable(
                                 tablePath,
                                 tableDescriptor,
-                                new 
DefaultLakeCatalogContext(currentSession().getPrincipal()));
+                                new DefaultLakeCatalogContext(
+                                        true, 
currentSession().getPrincipal()));
             } catch (TableAlreadyExistException e) {
-                throw new LakeTableAlreadyExistException(
-                        String.format(
-                                "The table %s already exists in %s catalog, 
please "
-                                        + "first drop the table in %s catalog 
or use a new table name.",
-                                tablePath,
-                                lakeCatalogContainer.getDataLakeFormat(),
-                                lakeCatalogContainer.getDataLakeFormat()));
+                throw new LakeTableAlreadyExistException(e.getMessage(), e);
             }
         }
 
@@ -330,9 +323,8 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                 tablePropertyChanges,
                 request.isIgnoreIfNotExists(),
                 lakeCatalogContainer.getLakeCatalog(),
-                lakeCatalogContainer.getDataLakeFormat(),
                 lakeTableTieringManager,
-                new 
DefaultLakeCatalogContext(currentSession().getPrincipal()));
+                new DefaultLakeCatalogContext(false, 
currentSession().getPrincipal()));
 
         return CompletableFuture.completedFuture(new AlterTableResponse());
     }
@@ -766,12 +758,20 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
 
     static class DefaultLakeCatalogContext implements LakeCatalog.Context {
 
+        private final boolean isCreatingFlussTable;
         private final FlussPrincipal flussPrincipal;
 
-        public DefaultLakeCatalogContext(FlussPrincipal flussPrincipal) {
+        public DefaultLakeCatalogContext(
+                boolean isCreatingFlussTable, FlussPrincipal flussPrincipal) {
+            this.isCreatingFlussTable = isCreatingFlussTable;
             this.flussPrincipal = flussPrincipal;
         }
 
+        @Override
+        public boolean isCreatingFlussTable() {
+            return isCreatingFlussTable;
+        }
+
         @Override
         public FlussPrincipal getFlussPrincipal() {
             return flussPrincipal;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index 074cb04f2..ed544a8d1 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -35,7 +35,6 @@ import 
org.apache.fluss.exception.TableNotPartitionedException;
 import org.apache.fluss.exception.TooManyBucketsException;
 import org.apache.fluss.exception.TooManyPartitionsException;
 import org.apache.fluss.lake.lakestorage.LakeCatalog;
-import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DatabaseInfo;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
@@ -327,7 +326,6 @@ public class MetadataManager {
             TablePropertyChanges tablePropertyChanges,
             boolean ignoreIfNotExists,
             @Nullable LakeCatalog lakeCatalog,
-            @Nullable DataLakeFormat dataLakeFormat,
             LakeTableTieringManager lakeTableTieringManager,
             LakeCatalog.Context lakeCatalogContext) {
         try {
@@ -360,7 +358,6 @@ public class MetadataManager {
                         newDescriptor,
                         tableChanges,
                         lakeCatalog,
-                        dataLakeFormat,
                         lakeCatalogContext);
                 // update the table to zk
                 TableRegistration updatedTableRegistration =
@@ -400,7 +397,6 @@ public class MetadataManager {
             TableDescriptor newDescriptor,
             List<TableChange> tableChanges,
             LakeCatalog lakeCatalog,
-            DataLakeFormat dataLakeFormat,
             LakeCatalog.Context lakeCatalogContext) {
         if (isDataLakeEnabled(newDescriptor)) {
             if (lakeCatalog == null) {
@@ -410,38 +406,31 @@ public class MetadataManager {
                                 + " in data lake, because the Fluss cluster 
doesn't enable datalake tables.");
             }
 
-            boolean isLakeTableNewlyCreated = false;
             // to enable lake table
             if (!isDataLakeEnabled(tableDescriptor)) {
                 // before create table in fluss, we may create in lake
                 try {
                     lakeCatalog.createTable(tablePath, newDescriptor, 
lakeCatalogContext);
-                    // no need to alter lake table if it is newly created
-                    isLakeTableNewlyCreated = true;
                 } catch (TableAlreadyExistException e) {
-                    // TODO: should tolerate if the lake exist but matches our 
schema. This ensures
-                    // eventually consistent by idempotently creating the 
table multiple times. See
-                    // #846
-                    throw new LakeTableAlreadyExistException(
-                            String.format(
-                                    "The table %s already exists in %s 
catalog, please "
-                                            + "first drop the table in %s 
catalog or use a new table name.",
-                                    tablePath, dataLakeFormat, 
dataLakeFormat));
+                    throw new LakeTableAlreadyExistException(e.getMessage(), 
e);
                 }
             }
+        }
 
-            // only need to alter lake table if it is not newly created
-            if (!isLakeTableNewlyCreated) {
-                {
-                    try {
-                        lakeCatalog.alterTable(tablePath, tableChanges, 
lakeCatalogContext);
-                    } catch (TableNotExistException e) {
-                        throw new FlussRuntimeException(
-                                "Lake table doesn't exists for lake-enabled 
table "
-                                        + tablePath
-                                        + ", which shouldn't be happened. 
Please check if the lake table was deleted manually.",
-                                e);
-                    }
+        // We should always alter lake table even though datalake is disabled.
+        // Otherwise, if user alter the fluss table when datalake is disabled, 
then enable datalake
+        // again, the lake table will mismatch.
+        if (lakeCatalog != null) {
+            try {
+                lakeCatalog.alterTable(tablePath, tableChanges, 
lakeCatalogContext);
+            } catch (TableNotExistException e) {
+                // only throw TableNotExistException if datalake is enabled
+                if (isDataLakeEnabled(newDescriptor)) {
+                    throw new FlussRuntimeException(
+                            "Lake table doesn't exist for lake-enabled table "
+                                    + tablePath
+                                    + ", which shouldn't be happened. Please 
check if the lake table was deleted manually.",
+                            e);
                 }
             }
         }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
index 8040bbd93..fb653b7d6 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
@@ -19,7 +19,7 @@ package org.apache.fluss.server.coordinator;
 
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
-import org.apache.fluss.exception.LakeTableAlreadyExistException;
+import org.apache.fluss.exception.TableAlreadyExistException;
 import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableDescriptor;
@@ -94,7 +94,7 @@ class LakeTableManagerITCase {
         adminGateway
                 .createTable(newCreateTableRequest(lakeTablePath, 
lakeTableDescriptor, false))
                 .get();
-        // create again, should throw TableAlreadyExistException thrown by lake
+        // create again, should throw TableAlreadyExistException thrown by 
Fluss
         assertThatThrownBy(
                         () ->
                                 adminGateway
@@ -103,9 +103,7 @@ class LakeTableManagerITCase {
                                                         lakeTablePath, 
lakeTableDescriptor, false))
                                         .get())
                 .cause()
-                .isInstanceOf(LakeTableAlreadyExistException.class)
-                .hasMessage(
-                        "The table %s already exists in paimon catalog, please 
first drop the table in paimon catalog or use a new table name.",
-                        lakeTablePath);
+                .isInstanceOf(TableAlreadyExistException.class)
+                .hasMessage("Table %s already exists.", lakeTablePath);
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
index c726e22e3..aa030d025 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
@@ -78,7 +78,10 @@ public class TestingPaimonStoragePlugin implements 
LakeStoragePlugin {
                 TablePath tablePath, TableDescriptor tableDescriptor, Context 
context)
                 throws TableAlreadyExistException {
             if (tableByPath.containsKey(tablePath)) {
-                throw new TableAlreadyExistException("Table " + tablePath + " 
already exists");
+                TableDescriptor existingTable = tableByPath.get(tablePath);
+                if (!existingTable.equals(tableDescriptor)) {
+                    throw new TableAlreadyExistException("Table " + tablePath 
+ " already exists.");
+                }
             }
             tableByPath.put(tablePath, tableDescriptor);
         }
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index 6a03e377a..0c6f54cd5 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -384,7 +384,7 @@
                                         
<exclude>org.apache.fluss.flink.DummyClass120</exclude>
                                         
<exclude>org.apache.fluss.lake.batch.ArrowRecordBatch</exclude>
                                         
<exclude>org.apache.fluss.lake.committer.CommittedLakeSnapshot</exclude>
-                                        
<exclude>org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType</exclude>
+                                        
<exclude>org.apache.fluss.lake.paimon.utils.FlussDataTypeToPaimonDataType</exclude>
                                         <!-- start exclude for lake lance -->
                                         
<exclude>org.apache.fluss.lake.lance.*</exclude>
                                         <!-- temporarily exclude iceberg -->


Reply via email to