This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c8ce3350a [flink] CDC schema evolution should pass catalog information 
(#1521)
c8ce3350a is described below

commit c8ce3350a7b9dc7fc42c7f07f98af7e8d6d652f6
Author: yuzelin <[email protected]>
AuthorDate: Sun Jul 9 10:57:49 2023 +0800

    [flink] CDC schema evolution should pass catalog information (#1521)
---
 docs/content/api/flink-api.md                      | 17 ++++-
 .../java/org/apache/paimon/catalog/Catalog.java    |  8 ++-
 .../apache/paimon/catalog/FileSystemCatalog.java   |  8 +--
 .../org/apache/paimon/schema/SchemaManager.java    | 14 +++--
 .../org/apache/paimon/catalog/CatalogTestBase.java | 72 +++++++++++++---------
 .../paimon/table/SchemaEvolutionTableTestBase.java |  2 +-
 .../apache/paimon/table/SchemaEvolutionTest.java   | 12 ++--
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  8 ++-
 .../org/apache/paimon/flink/action/ActionBase.java | 13 +++-
 .../action/cdc/kafka/KafkaSyncDatabaseAction.java  |  4 +-
 .../action/cdc/kafka/KafkaSyncTableAction.java     |  4 +-
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  | 13 +---
 .../action/cdc/mysql/MySqlSyncTableAction.java     |  4 +-
 .../paimon/flink/sink/cdc/CdcSinkBuilder.java      | 21 ++++++-
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  | 39 +++++++-----
 ...MultiTableUpdatedDataFieldsProcessFunction.java | 13 ++--
 .../paimon/flink/sink/cdc/RichCdcSinkBuilder.java  | 16 +++++
 .../sink/cdc/UpdatedDataFieldsProcessFunction.java | 25 ++++++--
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   | 29 ++++++---
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  7 ++-
 .../cdc/mysql/TestAlterTableCatalogFactory.java    | 59 ++++++++++++++++++
 .../mysql/TestCaseInsensitiveCatalogFactory.java   |  3 +-
 .../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java   | 10 +++
 .../sink/cdc/FlinkCdcSyncTableSinkITCase.java      | 31 +++++++++-
 .../org.apache.paimon.catalog.CatalogFactory       |  1 +
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 30 ++++-----
 .../java/org/apache/paimon/spark/SparkCatalog.java |  4 +-
 .../paimon/spark/SparkSchemaEvolutionITCase.java   | 37 ++++++-----
 28 files changed, 361 insertions(+), 143 deletions(-)

diff --git a/docs/content/api/flink-api.md b/docs/content/api/flink-api.md
index d39952b10..f2dd1f8c4 100644
--- a/docs/content/api/flink-api.md
+++ b/docs/content/api/flink-api.md
@@ -159,6 +159,7 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.sink.cdc.RichCdcRecord;
 import org.apache.paimon.flink.sink.cdc.RichCdcSinkBuilder;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataTypes;
@@ -186,12 +187,23 @@ public class WriteCdcToTable {
                                 .field("dt", DataTypes.TIMESTAMP(), 
"2023-06-12 20:21:12")
                                 .build());
 
-        new 
RichCdcSinkBuilder().withInput(dataStream).withTable(createTableIfNotExists()).build();
+        Identifier identifier = Identifier.create("my_db", "T");
+        Options catalogOptions = new Options();
+        catalogOptions.set("warehouse", "/path/to/warehouse");
+        Catalog.Loader catalogLoader = 
+                () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+        
+        new RichCdcSinkBuilder()
+                .withInput(dataStream)
+                .withTable(createTableIfNotExists(identifier))
+                .withIdentifier(identifier)
+                .withCatalogLoader(catalogLoader)
+                .build();
 
         env.execute();
     }
 
-    private static Table createTableIfNotExists() throws Exception {
+    private static Table createTableIfNotExists(Identifier identifier) throws 
Exception {
         CatalogContext context = CatalogContext.create(new Path("..."));
         Catalog catalog = CatalogFactory.createCatalog(context);
 
@@ -200,7 +212,6 @@ public class WriteCdcToTable {
         schemaBuilder.column("order_id", DataTypes.BIGINT());
         schemaBuilder.column("price", DataTypes.DOUBLE());
         Schema schema = schemaBuilder.build();
-        Identifier identifier = Identifier.create("my_db", "T");
         try {
             catalog.createTable(identifier, schema, false);
         } catch (Catalog.TableAlreadyExistException e) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index b26d7fe37..0004680c9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -24,6 +24,7 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Table;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
@@ -181,7 +182,12 @@ public interface Catalog extends AutoCloseable {
      * @throws TableNotExistException if the table does not exist
      */
     void alterTable(Identifier identifier, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
-            throws TableNotExistException;
+            throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException;
+
+    default void alterTable(Identifier identifier, SchemaChange change, 
boolean ignoreIfNotExists)
+            throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
+        alterTable(identifier, Collections.singletonList(change), 
ignoreIfNotExists);
+    }
 
     /** Return a boolean that indicates whether this catalog is 
case-sensitive. */
     default boolean caseSensitive() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 8a52c5182..16a99b1b7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -189,15 +189,13 @@ public class FileSystemCatalog extends AbstractCatalog {
     @Override
     public void alterTable(
             Identifier identifier, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
-            throws TableNotExistException {
+            throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
         checkNotSystemTable(identifier, "alterTable");
         if (!tableExists(getDataTableLocation(identifier))) {
             throw new TableNotExistException(identifier);
         }
-        uncheck(
-                () ->
-                        new SchemaManager(fileIO, 
getDataTableLocation(identifier))
-                                .commitChanges(changes));
+
+        new SchemaManager(fileIO, 
getDataTableLocation(identifier)).commitChanges(changes);
     }
 
     private static <T> T uncheck(Callable<T> callable) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index e47191b81..f46d3b8c8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -187,7 +187,9 @@ public class SchemaManager implements Serializable {
     }
 
     /** Update {@link SchemaChange}s. */
-    public TableSchema commitChanges(List<SchemaChange> changes) throws 
Exception {
+    public TableSchema commitChanges(List<SchemaChange> changes)
+            throws Catalog.TableNotExistException, 
Catalog.ColumnAlreadyExistException,
+                    Catalog.ColumnNotExistException {
         while (true) {
             TableSchema schema =
                     latest().orElseThrow(
@@ -372,9 +374,13 @@ public class SchemaManager implements Serializable {
                             newOptions,
                             schema.comment());
 
-            boolean success = commit(newSchema);
-            if (success) {
-                return newSchema;
+            try {
+                boolean success = commit(newSchema);
+                if (success) {
+                    return newSchema;
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
             }
         }
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index b498e2d06..9c71fb57d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -41,6 +41,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -468,8 +469,10 @@ public abstract class CatalogTestBase {
                                         Lists.newArrayList(
                                                 SchemaChange.addColumn("col1", 
DataTypes.INT())),
                                         false))
-                
.hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class)
-                .hasRootCauseMessage("Column col1 already exists in the 
test_db.test_table table.");
+                .satisfies(
+                        anyCauseMatches(
+                                Catalog.ColumnAlreadyExistException.class,
+                                "Column col1 already exists in the 
test_db.test_table table."));
     }
 
     @Test
@@ -506,8 +509,10 @@ public abstract class CatalogTestBase {
                                         Lists.newArrayList(
                                                 
SchemaChange.renameColumn("col1", "new_col1")),
                                         false))
-                
.hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class)
-                .hasRootCauseMessage("Column col1 already exists in the 
test_db.test_table table.");
+                .satisfies(
+                        anyCauseMatches(
+                                Catalog.ColumnAlreadyExistException.class,
+                                "Column col1 already exists in the 
test_db.test_table table."));
 
         // Alter table renames a column throws ColumnNotExistException when 
column does not exist
         assertThatThrownBy(
@@ -518,9 +523,10 @@ public abstract class CatalogTestBase {
                                                 SchemaChange.renameColumn(
                                                         "non_existing_col", 
"new_col2")),
                                         false))
-                .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
-                .hasRootCauseMessage(
-                        "Column [non_existing_col] does not exist in the 
test_db.test_table table.");
+                .satisfies(
+                        anyCauseMatches(
+                                Catalog.ColumnNotExistException.class,
+                                "Column [non_existing_col] does not exist in 
the test_db.test_table table."));
     }
 
     @Test
@@ -553,8 +559,9 @@ public abstract class CatalogTestBase {
                                         identifier,
                                         
Lists.newArrayList(SchemaChange.dropColumn("col2")),
                                         false))
-                .hasRootCauseInstanceOf(IllegalArgumentException.class)
-                .hasMessageContaining(" Cannot drop all fields in table");
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class, "Cannot drop 
all fields in table"));
 
         // Alter table drop a column throws ColumnNotExistException when 
column does not exist
         assertThatThrownBy(
@@ -564,9 +571,10 @@ public abstract class CatalogTestBase {
                                         Lists.newArrayList(
                                                 
SchemaChange.dropColumn("non_existing_col")),
                                         false))
-                .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
-                .hasRootCauseMessage(
-                        "Column non_existing_col does not exist in the 
test_db.test_table table.");
+                .satisfies(
+                        anyCauseMatches(
+                                Catalog.ColumnNotExistException.class,
+                                "Column non_existing_col does not exist in the 
test_db.test_table table."));
     }
 
     @Test
@@ -605,9 +613,10 @@ public abstract class CatalogTestBase {
                                                 SchemaChange.updateColumnType(
                                                         "col1", 
DataTypes.DATE())),
                                         false))
-                .hasRootCauseInstanceOf(IllegalStateException.class)
-                .hasMessageContaining(
-                        "Column type col1[DOUBLE] cannot be converted to DATE 
without loosing information.");
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalStateException.class,
+                                "Column type col1[DOUBLE] cannot be converted 
to DATE without loosing information."));
 
         // Alter table update a column type throws ColumnNotExistException 
when column does not
         // exist
@@ -619,9 +628,10 @@ public abstract class CatalogTestBase {
                                                 SchemaChange.updateColumnType(
                                                         "non_existing_col", 
DataTypes.INT())),
                                         false))
-                .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
-                .hasRootCauseMessage(
-                        "Column [non_existing_col] does not exist in the 
test_db.test_table table.");
+                .satisfies(
+                        anyCauseMatches(
+                                Catalog.ColumnNotExistException.class,
+                                "Column [non_existing_col] does not exist in 
the test_db.test_table table."));
         // Alter table update a column type throws Exception when column is 
partition columns
         assertThatThrownBy(
                         () ->
@@ -631,8 +641,10 @@ public abstract class CatalogTestBase {
                                                 SchemaChange.updateColumnType(
                                                         "dt", 
DataTypes.DATE())),
                                         false))
-                .hasRootCauseInstanceOf(IllegalArgumentException.class)
-                .hasMessageContaining("Cannot update partition column [dt] 
type in the table");
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Cannot update partition column [dt] type in 
the table"));
     }
 
     @Test
@@ -687,9 +699,10 @@ public abstract class CatalogTestBase {
                                                 
SchemaChange.updateColumnComment(
                                                         new String[] 
{"non_existing_col"}, "")),
                                         false))
-                .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
-                .hasMessageContaining(
-                        "Column [non_existing_col] does not exist in the 
test_db.test_table table.");
+                .satisfies(
+                        anyCauseMatches(
+                                Catalog.ColumnNotExistException.class,
+                                "Column [non_existing_col] does not exist in 
the test_db.test_table table."));
     }
 
     @Test
@@ -742,9 +755,10 @@ public abstract class CatalogTestBase {
                                                 
SchemaChange.updateColumnNullability(
                                                         new String[] 
{"non_existing_col"}, false)),
                                         false))
-                .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
-                .hasMessageContaining(
-                        "Column [non_existing_col] does not exist in the 
test_db.test_table table.");
+                .satisfies(
+                        anyCauseMatches(
+                                Catalog.ColumnNotExistException.class,
+                                "Column [non_existing_col] does not exist in 
the test_db.test_table table."));
 
         // Alter table update a column nullability throws Exception when 
column is pk columns
         assertThatThrownBy(
@@ -755,7 +769,9 @@ public abstract class CatalogTestBase {
                                                 
SchemaChange.updateColumnNullability(
                                                         new String[] {"col2"}, 
true)),
                                         false))
-                .hasRootCauseInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Cannot change nullability of primary 
key");
+                .satisfies(
+                        anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Cannot change nullability of primary key"));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
index e2630744b..ad76e557a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
@@ -513,7 +513,7 @@ public abstract class SchemaEvolutionTableTestBase {
         }
 
         @Override
-        public TableSchema commitChanges(List<SchemaChange> changes) throws 
Exception {
+        public TableSchema commitChanges(List<SchemaChange> changes) {
             throw new UnsupportedOperationException();
         }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 184dea774..163434cc5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -332,11 +333,12 @@ public class SchemaEvolutionTest {
                                 schemaManager.commitChanges(
                                         Collections.singletonList(
                                                 
SchemaChange.renameColumn("f0", "_VALUE_KIND"))))
-                .isInstanceOf(IllegalStateException.class)
-                .hasMessage(
-                        String.format(
-                                "Field name[%s] in schema cannot be exist in 
%s",
-                                "_VALUE_KIND", SYSTEM_FIELD_NAMES));
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                RuntimeException.class,
+                                String.format(
+                                        "Field name[%s] in schema cannot be 
exist in %s",
+                                        "_VALUE_KIND", SYSTEM_FIELD_NAMES)));
     }
 
     private List<String> readRecords(FileStoreTable table, Predicate filter) 
throws IOException {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index b7a4bca89..f3df315bb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -384,7 +384,9 @@ public class FlinkCatalog extends AbstractCatalog {
 
         try {
             catalog.alterTable(toIdentifier(tablePath), changes, 
ignoreIfNotExists);
-        } catch (Catalog.TableNotExistException e) {
+        } catch (Catalog.TableNotExistException
+                | Catalog.ColumnAlreadyExistException
+                | Catalog.ColumnNotExistException e) {
             throw new TableNotExistException(getName(), tablePath);
         }
     }
@@ -415,7 +417,9 @@ public class FlinkCatalog extends AbstractCatalog {
 
         try {
             catalog.alterTable(toIdentifier(tablePath), changes, 
ignoreIfNotExists);
-        } catch (Catalog.TableNotExistException e) {
+        } catch (Catalog.TableNotExistException
+                | Catalog.ColumnAlreadyExistException
+                | Catalog.ColumnNotExistException e) {
             throw new TableNotExistException(getName(), tablePath);
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 49e805393..0e4eaa654 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -35,15 +35,16 @@ import java.util.UUID;
 /** Abstract base of {@link Action} for table. */
 public abstract class ActionBase implements Action {
 
-    protected final Catalog catalog;
+    private final Options catalogOptions;
 
+    protected final Catalog catalog;
     protected final FlinkCatalog flinkCatalog;
-
     protected final String catalogName = "paimon-" + UUID.randomUUID();
 
     public ActionBase(String warehouse, Map<String, String> catalogConfig) {
-        Options catalogOptions = Options.fromMap(catalogConfig);
+        catalogOptions = Options.fromMap(catalogConfig);
         catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
+
         catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
         flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog);
     }
@@ -53,4 +54,10 @@ public abstract class ActionBase implements Action {
         String name = 
conf.getOptional(PipelineOptions.NAME).orElse(defaultName);
         env.execute(name);
     }
+
+    protected Catalog.Loader catalogLoader() {
+        // to make the action workflow serializable
+        Options catalogOptions = this.catalogOptions;
+        return () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 676c9d344..f254ac5b4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -229,7 +229,9 @@ public class KafkaSyncDatabaseAction extends ActionBase {
                                 env.fromSource(
                                         source, 
WatermarkStrategy.noWatermarks(), "Kafka Source"))
                         .withParserFactory(parserFactory)
-                        .withTables(fileStoreTables);
+                        .withTables(fileStoreTables)
+                        .withCatalogLoader(catalogLoader())
+                        .withDatabase(database);
         String sinkParallelism = 
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
         if (sinkParallelism != null) {
             sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index cbcef632c..418a83102 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -187,7 +187,9 @@ public class KafkaSyncTableAction extends ActionBase {
                                 env.fromSource(
                                         source, 
WatermarkStrategy.noWatermarks(), "Kafka Source"))
                         .withParserFactory(parserFactory)
-                        .withTable(table);
+                        .withTable(table)
+                        .withIdentifier(identifier)
+                        .withCatalogLoader(catalogLoader());
         String sinkParallelism = 
paimonConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
         if (sinkParallelism != null) {
             sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 1fe314242..97cde5d17 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -20,15 +20,12 @@ package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
-import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
@@ -114,7 +111,6 @@ public class MySqlSyncDatabaseAction extends ActionBase {
     @Nullable private final Pattern excludingPattern;
     private final Map<String, String> tableConfig;
     private final String includingTables;
-    private final Options catalogOptions;
     private final MySqlDatabaseSyncMode mode;
 
     public MySqlSyncDatabaseAction(
@@ -159,8 +155,6 @@ public class MySqlSyncDatabaseAction extends ActionBase {
         this.includingTables = includingTables == null ? ".*" : 
includingTables;
         this.includingPattern = Pattern.compile(this.includingTables);
         this.excludingPattern = excludingTables == null ? null : 
Pattern.compile(excludingTables);
-        this.catalogOptions = Options.fromMap(catalogConfig);
-        catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
         this.tableConfig = tableConfig;
         this.mode = mode;
     }
@@ -249,8 +243,6 @@ public class MySqlSyncDatabaseAction extends ActionBase {
 
         String database = this.database;
         MySqlDatabaseSyncMode mode = this.mode;
-        // To make the sync database workflow serializable
-        Options catalogOptions = this.catalogOptions;
         FlinkCdcSyncDatabaseSinkBuilder<String> sinkBuilder =
                 new FlinkCdcSyncDatabaseSinkBuilder<String>()
                         .withInput(
@@ -258,10 +250,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                                         source, 
WatermarkStrategy.noWatermarks(), "MySQL Source"))
                         .withParserFactory(parserFactory)
                         .withDatabase(database)
-                        .withCatalogLoader(
-                                () -> {
-                                    return 
FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
-                                })
+                        .withCatalogLoader(catalogLoader())
                         .withTables(fileStoreTables)
                         .withMode(mode);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 5b83ba0f7..8382aee6d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -191,7 +191,9 @@ public class MySqlSyncTableAction extends ActionBase {
                                 env.fromSource(
                                         source, 
WatermarkStrategy.noWatermarks(), "MySQL Source"))
                         .withParserFactory(parserFactory)
-                        .withTable(table);
+                        .withTable(table)
+                        .withIdentifier(identifier)
+                        .withCatalogLoader(catalogLoader());
         String sinkParallelism = 
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
         if (sinkParallelism != null) {
             sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 4d38b869f..5fd0f80d3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -19,6 +19,8 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.annotation.Experimental;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.BucketMode;
@@ -45,6 +47,8 @@ public class CdcSinkBuilder<T> {
     private DataStream<T> input = null;
     private EventParser.Factory<T> parserFactory = null;
     private Table table = null;
+    private Identifier identifier = null;
+    private Catalog.Loader catalogLoader = null;
 
     @Nullable private Integer parallelism;
 
@@ -68,10 +72,22 @@ public class CdcSinkBuilder<T> {
         return this;
     }
 
+    public CdcSinkBuilder<T> withIdentifier(Identifier identifier) {
+        this.identifier = identifier;
+        return this;
+    }
+
+    public CdcSinkBuilder<T> withCatalogLoader(Catalog.Loader catalogLoader) {
+        this.catalogLoader = catalogLoader;
+        return this;
+    }
+
     public DataStreamSink<?> build() {
         Preconditions.checkNotNull(input, "Input DataStream can not be null.");
         Preconditions.checkNotNull(parserFactory, "Event ParserFactory can not 
be null.");
         Preconditions.checkNotNull(table, "Paimon Table can not be null.");
+        Preconditions.checkNotNull(identifier, "Paimon Table Identifier can 
not be null.");
+        Preconditions.checkNotNull(catalogLoader, "Paimon Catalog Loader can 
not be null.");
 
         if (!(table instanceof FileStoreTable)) {
             throw new IllegalArgumentException(
@@ -90,8 +106,9 @@ public class CdcSinkBuilder<T> {
                                 parsed, 
CdcParsingProcessFunction.NEW_DATA_FIELD_LIST_OUTPUT_TAG)
                         .process(
                                 new UpdatedDataFieldsProcessFunction(
-                                        new SchemaManager(
-                                                dataTable.fileIO(), 
dataTable.location())));
+                                        new SchemaManager(dataTable.fileIO(), 
dataTable.location()),
+                                        identifier,
+                                        catalogLoader));
         schemaChangeProcessFunction.getTransformation().setParallelism(1);
         schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index 404fcbd87..6ba1278a1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode;
 import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
 import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
@@ -90,9 +91,26 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         return this;
     }
 
+    public FlinkCdcSyncDatabaseSinkBuilder<T> withDatabase(String database) {
+        this.database = database;
+        return this;
+    }
+
+    public FlinkCdcSyncDatabaseSinkBuilder<T> withCatalogLoader(Catalog.Loader 
catalogLoader) {
+        this.catalogLoader = catalogLoader;
+        return this;
+    }
+
+    public FlinkCdcSyncDatabaseSinkBuilder<T> withMode(MySqlDatabaseSyncMode 
mode) {
+        this.mode = mode;
+        return this;
+    }
+
     public void build() {
         Preconditions.checkNotNull(input);
         Preconditions.checkNotNull(parserFactory);
+        Preconditions.checkNotNull(database);
+        Preconditions.checkNotNull(catalogLoader);
 
         if (mode == UNIFIED) {
             buildUnifiedCdcSink();
@@ -145,6 +163,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
     }
 
     private void buildStaticCdcSink() {
+        Preconditions.checkNotNull(tables);
+
         SingleOutputStreamOperator<Void> parsed =
                 input.forward()
                         .process(new 
CdcMultiTableParsingProcessFunction<>(parserFactory))
@@ -158,7 +178,9 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
                                             
.createUpdatedDataFieldsOutputTag(table.name()))
                             .process(
                                     new UpdatedDataFieldsProcessFunction(
-                                            new SchemaManager(table.fileIO(), 
table.location())));
+                                            new SchemaManager(table.fileIO(), 
table.location()),
+                                            Identifier.create(database, 
table.name()),
+                                            catalogLoader));
             schemaChangeProcessFunction.getTransformation().setParallelism(1);
             
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
 
@@ -183,19 +205,4 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
             }
         }
     }
-
-    public FlinkCdcSyncDatabaseSinkBuilder<T> withDatabase(String database) {
-        this.database = database;
-        return this;
-    }
-
-    public FlinkCdcSyncDatabaseSinkBuilder<T> withCatalogLoader(Catalog.Loader 
catalogLoader) {
-        this.catalogLoader = catalogLoader;
-        return this;
-    }
-
-    public FlinkCdcSyncDatabaseSinkBuilder<T> withMode(MySqlDatabaseSyncMode 
mode) {
-        this.mode = mode;
-        return this;
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index 1f0c800d6..834930355 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -98,7 +98,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction
 
         for (SchemaChange schemaChange :
                 extractSchemaChanges(schemaManager, updatedDataFields.f1)) {
-            applySchemaChange(schemaManager, schemaChange);
+            applySchemaChange(schemaManager, schemaChange, tableId);
         }
     }
 
@@ -139,14 +139,15 @@ public class MultiTableUpdatedDataFieldsProcessFunction
         return result;
     }
 
-    private void applySchemaChange(SchemaManager schemaManager, SchemaChange 
schemaChange)
+    private void applySchemaChange(
+            SchemaManager schemaManager, SchemaChange schemaChange, Identifier 
identifier)
             throws Exception {
         if (schemaChange instanceof SchemaChange.AddColumn) {
             try {
-                schemaManager.commitChanges(schemaChange);
+                catalog.alterTable(identifier, schemaChange, false);
             } catch (Catalog.ColumnAlreadyExistException e) {
                 // This is normal. For example when a table is split into 
multiple database tables,
-                // all these tables will be added the same column. However 
schemaManager can't
+                // all these tables will be added the same column. However, 
schemaManager can't
                 // handle duplicated column adds, so we just catch the 
exception and log it.
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(
@@ -176,7 +177,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction
             DataType newType = updateColumnType.newDataType();
             switch (canConvert(oldType, newType)) {
                 case CONVERT:
-                    schemaManager.commitChanges(schemaChange);
+                    catalog.alterTable(identifier, schemaChange, false);
                     break;
                 case EXCEPTION:
                     throw new UnsupportedOperationException(
@@ -185,7 +186,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction
                                     updateColumnType.fieldName(), oldType, 
newType));
             }
         } else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
-            schemaManager.commitChanges(schemaChange);
+            catalog.alterTable(identifier, schemaChange, false);
         } else {
             throw new UnsupportedOperationException(
                     "Unsupported schema change class "
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
index 130d3812c..5648b695a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
@@ -19,6 +19,8 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.annotation.Experimental;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.table.Table;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -32,6 +34,8 @@ public class RichCdcSinkBuilder {
 
     private DataStream<RichCdcRecord> input = null;
     private Table table = null;
+    private Identifier identifier = null;
+    private Catalog.Loader catalogLoader = null;
 
     @Nullable private Integer parallelism;
 
@@ -50,12 +54,24 @@ public class RichCdcSinkBuilder {
         return this;
     }
 
+    public RichCdcSinkBuilder withIdentifier(Identifier identifier) {
+        this.identifier = identifier;
+        return this;
+    }
+
+    public RichCdcSinkBuilder withCatalogLoader(Catalog.Loader catalogLoader) {
+        this.catalogLoader = catalogLoader;
+        return this;
+    }
+
     public DataStreamSink<?> build() {
         CdcSinkBuilder<RichCdcRecord> builder = new CdcSinkBuilder<>();
         return builder.withTable(table)
                 .withInput(input)
                 .withParserFactory(new RichCdcParserFactory())
                 .withParallelism(parallelism)
+                .withIdentifier(identifier)
+                .withCatalogLoader(catalogLoader)
                 .build();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 5ff10d82d..b179dbc9e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -29,6 +30,7 @@ import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
@@ -55,8 +57,21 @@ public class UpdatedDataFieldsProcessFunction extends 
ProcessFunction<List<DataF
 
     private final SchemaManager schemaManager;
 
-    public UpdatedDataFieldsProcessFunction(SchemaManager schemaManager) {
+    private final Catalog.Loader catalogLoader;
+    private final Identifier identifier;
+
+    private Catalog catalog;
+
+    public UpdatedDataFieldsProcessFunction(
+            SchemaManager schemaManager, Identifier identifier, Catalog.Loader 
catalogLoader) {
         this.schemaManager = schemaManager;
+        this.identifier = identifier;
+        this.catalogLoader = catalogLoader;
+    }
+
+    @Override
+    public void open(Configuration parameters) {
+        this.catalog = catalogLoader.load();
     }
 
     @Override
@@ -107,10 +122,10 @@ public class UpdatedDataFieldsProcessFunction extends 
ProcessFunction<List<DataF
     private void applySchemaChange(SchemaChange schemaChange) throws Exception 
{
         if (schemaChange instanceof SchemaChange.AddColumn) {
             try {
-                schemaManager.commitChanges(schemaChange);
+                catalog.alterTable(identifier, schemaChange, false);
             } catch (Catalog.ColumnAlreadyExistException e) {
                 // This is normal. For example when a table is split into 
multiple database tables,
-                // all these tables will be added the same column. However 
schemaManager can't
+                // all these tables will be added the same column. However, 
schemaManager can't
                 // handle duplicated column adds, so we just catch the 
exception and log it.
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(
@@ -140,7 +155,7 @@ public class UpdatedDataFieldsProcessFunction extends 
ProcessFunction<List<DataF
             DataType newType = updateColumnType.newDataType();
             switch (canConvert(oldType, newType)) {
                 case CONVERT:
-                    schemaManager.commitChanges(schemaChange);
+                    catalog.alterTable(identifier, schemaChange, false);
                     break;
                 case EXCEPTION:
                     throw new UnsupportedOperationException(
@@ -149,7 +164,7 @@ public class UpdatedDataFieldsProcessFunction extends 
ProcessFunction<List<DataF
                                     updateColumnType.fieldName(), oldType, 
newType));
             }
         } else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
-            schemaManager.commitChanges(schemaChange);
+            catalog.alterTable(identifier, schemaChange, false);
         } else {
             throw new UnsupportedOperationException(
                     "Unsupported schema change class "
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 50b4611c4..0e3143a0a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -559,6 +559,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         assertTableNotExists(notExistedTables);
     }
 
+    @Test
     @Timeout(60)
     public void testIgnoreCase() throws Exception {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
@@ -671,7 +672,8 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
             boolean testSchemaChange,
             String databaseName)
             throws Exception {
-        JobClient client = 
buildSyncDatabaseActionWithNewlyAddedTables(databaseName);
+        JobClient client =
+                buildSyncDatabaseActionWithNewlyAddedTables(databaseName, 
testSchemaChange);
         waitJobRunning(client);
 
         try (Connection conn =
@@ -759,7 +761,9 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                             .join();
             assertThat(savepoint).isNotBlank();
 
-            client = buildSyncDatabaseActionWithNewlyAddedTables(savepoint, 
databaseName);
+            client =
+                    buildSyncDatabaseActionWithNewlyAddedTables(
+                            savepoint, databaseName, testSchemaChange);
             waitJobRunning(client);
         }
 
@@ -832,6 +836,10 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                             },
                             new String[] {"k", "v1", "v2"});
             waitForResult(expectedRecords, newTable, rowType, 
newTablePrimaryKeys);
+
+            // test that catalog loader works
+            assertThat(getFileStoreTable(tableName).options())
+                    .containsEntry("alter-table-test", "true");
         }
     }
 
@@ -877,13 +885,13 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         "CREATE TABLE %s (k INT, v1 VARCHAR(10), PRIMARY KEY 
(k))", newTableName));
     }
 
-    private JobClient buildSyncDatabaseActionWithNewlyAddedTables(String 
databaseName)
-            throws Exception {
-        return buildSyncDatabaseActionWithNewlyAddedTables(null, databaseName);
+    private JobClient buildSyncDatabaseActionWithNewlyAddedTables(
+            String databaseName, boolean testSchemaChange) throws Exception {
+        return buildSyncDatabaseActionWithNewlyAddedTables(null, databaseName, 
testSchemaChange);
     }
 
     private JobClient buildSyncDatabaseActionWithNewlyAddedTables(
-            String savepointPath, String databaseName) throws Exception {
+            String savepointPath, String databaseName, boolean 
testSchemaChange) throws Exception {
 
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", databaseName);
@@ -897,6 +905,13 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         Map<String, String> tableConfig = new HashMap<>();
         tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1));
         tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(2) + 
2));
+
+        Map<String, String> catalogConfig =
+                testSchemaChange
+                        ? Collections.singletonMap(
+                                CatalogOptions.METASTORE.key(), 
"test-alter-table")
+                        : Collections.emptyMap();
+
         MySqlSyncDatabaseAction action =
                 new MySqlSyncDatabaseAction(
                         mySqlConfig,
@@ -907,7 +922,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         null,
                         "t.+",
                         null,
-                        Collections.emptyMap(),
+                        catalogConfig,
                         tableConfig,
                         UNIFIED);
         action.build(env);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 30bde7232..8a608e674 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
@@ -79,7 +80,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         tableName,
                         Collections.singletonList("pt"),
                         Arrays.asList("pt", "_id"),
-                        Collections.emptyMap(),
+                        Collections.singletonMap(
+                                CatalogOptions.METASTORE.key(), 
"test-alter-table"),
                         tableConfig);
         action.build(env);
         JobClient client = env.executeAsync();
@@ -245,6 +247,9 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         "+I[2, 8, very long string, 80000000000, NULL, NULL, 
NULL]",
                         "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 
110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]");
         waitForResult(expected, table, rowType, primaryKeys);
+
+        // test that catalog loader works
+        
assertThat(getFileStoreTable().options()).containsEntry("alter-table-test", 
"true");
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestAlterTableCatalogFactory.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestAlterTableCatalogFactory.java
new file mode 100644
index 000000000..d7f9f7ce1
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestAlterTableCatalogFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.SchemaChange;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Factory to create a mock catalog for catalog loader test. If catalog loader 
works, the
+ * 'alterTable' method will leave a special option.
+ */
+public class TestAlterTableCatalogFactory implements CatalogFactory {
+
+    @Override
+    public String identifier() {
+        return "test-alter-table";
+    }
+
+    @Override
+    public Catalog create(FileIO fileIO, Path warehouse, CatalogContext 
context) {
+        return new FileSystemCatalog(fileIO, warehouse, 
context.options().toMap()) {
+
+            @Override
+            public void alterTable(
+                    Identifier identifier, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
+                    throws TableNotExistException, ColumnAlreadyExistException,
+                            ColumnNotExistException {
+                List<SchemaChange> newChanges = new ArrayList<>(changes);
+                newChanges.add(SchemaChange.setOption("alter-table-test", 
"true"));
+                super.alterTable(identifier, newChanges, ignoreIfNotExists);
+            }
+        };
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
index e0ba24b9a..82caabd7b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TestCaseInsensitiveCatalogFactory.java
@@ -71,7 +71,8 @@ public class TestCaseInsensitiveCatalogFactory implements 
CatalogFactory {
             @Override
             public void alterTable(
                     Identifier identifier, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
-                    throws TableNotExistException {
+                    throws TableNotExistException, ColumnAlreadyExistException,
+                            ColumnNotExistException {
                 for (SchemaChange change : changes) {
                     if (change instanceof SchemaChange.AddColumn) {
                         checkCaseInsensitive(((SchemaChange.AddColumn) 
change).fieldName());
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index d3cf58e0f..c8b0a754b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -19,8 +19,10 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -146,6 +148,12 @@ public class FlinkCdcSyncDatabaseSinkITCase extends 
AbstractTestBase {
         TestCdcSourceFunction sourceFunction = new 
TestCdcSourceFunction(events);
         DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
         source.setParallelism(2);
+
+        Options catalogOptions = new Options();
+        catalogOptions.set("warehouse", tempDir.toString());
+        Catalog.Loader catalogLoader =
+                () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+
         new FlinkCdcSyncDatabaseSinkBuilder<TestCdcEvent>()
                 .withInput(source)
                 .withParserFactory(TestCdcEventParser::new)
@@ -153,6 +161,8 @@ public class FlinkCdcSyncDatabaseSinkITCase extends 
AbstractTestBase {
                 // because we have at most 3 tables and 8 slots in 
AbstractTestBase
                 // each table can only get 2 slots
                 .withParallelism(2)
+                .withDatabase(DATABASE_NAME)
+                .withCatalogLoader(catalogLoader)
                 .build();
 
         // enable failure when running jobs if needed
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index e47cb03d9..a16447364 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -19,7 +19,11 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogUtils;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -55,6 +59,9 @@ import java.util.concurrent.ThreadLocalRandom;
 /** IT cases for {@link CdcSinkBuilder}. */
 public class FlinkCdcSyncTableSinkITCase extends AbstractTestBase {
 
+    private static final String DATABASE_NAME = "test";
+    private static final String TABLE_NAME = "test_tbl";
+
     @TempDir java.nio.file.Path tempDir;
 
     @Test
@@ -79,16 +86,26 @@ public class FlinkCdcSyncTableSinkITCase extends 
AbstractTestBase {
         boolean enableFailure = random.nextBoolean();
 
         TestTable testTable =
-                new TestTable("test_tbl", numEvents, numSchemaChanges, 
numPartitions, numKeys);
+                new TestTable(TABLE_NAME, numEvents, numSchemaChanges, 
numPartitions, numKeys);
 
         Path tablePath;
         FileIO fileIO;
         String failingName = UUID.randomUUID().toString();
         if (enableFailure) {
-            tablePath = new Path(FailingFileIO.getFailingPath(failingName, 
tempDir.toString()));
+            tablePath =
+                    new Path(
+                            FailingFileIO.getFailingPath(
+                                    failingName,
+                                    CatalogUtils.stringifyPath(
+                                            tempDir.toString(), DATABASE_NAME, 
TABLE_NAME)));
             fileIO = new FailingFileIO();
         } else {
-            tablePath = new Path(TraceableFileIO.SCHEME + "://" + 
tempDir.toString());
+            tablePath =
+                    new Path(
+                            TraceableFileIO.SCHEME
+                                    + "://"
+                                    + CatalogUtils.stringifyPath(
+                                            tempDir.toString(), DATABASE_NAME, 
TABLE_NAME));
             fileIO = LocalFileIO.create();
         }
 
@@ -113,11 +130,19 @@ public class FlinkCdcSyncTableSinkITCase extends 
AbstractTestBase {
         TestCdcSourceFunction sourceFunction = new 
TestCdcSourceFunction(testTable.events());
         DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
         source.setParallelism(2);
+
+        Options catalogOptions = new Options();
+        catalogOptions.set("warehouse", tempDir.toString());
+        Catalog.Loader catalogLoader =
+                () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+
         new CdcSinkBuilder<TestCdcEvent>()
                 .withInput(source)
                 .withParserFactory(TestCdcEventParser::new)
                 .withTable(table)
                 .withParallelism(3)
+                .withIdentifier(Identifier.create(DATABASE_NAME, TABLE_NAME))
+                .withCatalogLoader(catalogLoader)
                 .build();
 
         // enable failure when running jobs if needed
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
index 3474fc387..faa8cf673 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.paimon.flink.action.cdc.mysql.TestCaseInsensitiveCatalogFactory
+org.apache.paimon.flink.action.cdc.mysql.TestAlterTableCatalogFactory
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 29f26f14f..ea73a0995 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -343,7 +343,7 @@ public class HiveCatalog extends AbstractCatalog {
     @Override
     public void alterTable(
             Identifier identifier, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
-            throws TableNotExistException {
+            throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
         checkNotSystemTable(identifier, "alterTable");
         if (!paimonTableExists(identifier)) {
             if (ignoreIfNotExists) {
@@ -354,23 +354,19 @@ public class HiveCatalog extends AbstractCatalog {
         }
 
         checkFieldNamesUpperCaseInSchemaChange(changes);
-        try {
-            final SchemaManager schemaManager = schemaManager(identifier);
-            // first commit changes to underlying files
-            TableSchema schema = schemaManager.commitChanges(changes);
 
-            try {
-                // sync to hive hms
-                Table table =
-                        client.getTable(identifier.getDatabaseName(), 
identifier.getObjectName());
-                updateHmsTable(table, identifier, schema);
-                client.alter_table(identifier.getDatabaseName(), 
identifier.getObjectName(), table);
-            } catch (TException te) {
-                schemaManager.deleteSchema(schema.id());
-                throw te;
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+        final SchemaManager schemaManager = schemaManager(identifier);
+        // first commit changes to underlying files
+        TableSchema schema = schemaManager.commitChanges(changes);
+
+        try {
+            // sync to hive hms
+            Table table = client.getTable(identifier.getDatabaseName(), 
identifier.getObjectName());
+            updateHmsTable(table, identifier, schema);
+            client.alter_table(identifier.getDatabaseName(), 
identifier.getObjectName(), table);
+        } catch (TException te) {
+            schemaManager.deleteSchema(schema.id());
+            throw new RuntimeException(te);
         }
     }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 3e93d5d8e..a50fe1abb 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -278,7 +278,9 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces {
         try {
             catalog.alterTable(toIdentifier(ident), schemaChanges, false);
             return loadTable(ident);
-        } catch (Catalog.TableNotExistException e) {
+        } catch (Catalog.TableNotExistException
+                | Catalog.ColumnAlreadyExistException
+                | Catalog.ColumnNotExistException e) {
             throw new NoSuchTableException(ident);
         }
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index 7ed2ed2eb..64cf6519b 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.spark;
 
-import org.apache.paimon.testutils.assertj.AssertionUtils;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
@@ -31,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -97,9 +96,10 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                         () ->
                                 spark.sql(
                                         "ALTER TABLE testAddNotNullColumn ADD 
COLUMNS (d INT NOT NULL)"))
-                .isInstanceOf(RuntimeException.class)
-                .hasMessage(
-                        "java.lang.IllegalArgumentException: ADD COLUMN cannot 
specify NOT NULL.");
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "ADD COLUMN cannot specify NOT NULL."));
     }
 
     @Test
@@ -193,9 +193,10 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
 
         assertThatThrownBy(
                         () -> spark.sql("ALTER TABLE testRenamePartitionKey 
RENAME COLUMN a to aa"))
-                .isInstanceOf(RuntimeException.class)
-                .hasMessage(
-                        "java.lang.UnsupportedOperationException: Cannot 
drop/rename partition key[a]");
+                .satisfies(
+                        anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Cannot drop/rename partition key[a]"));
     }
 
     @Test
@@ -242,9 +243,10 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                 .contains(showCreateString("testDropPartitionKey", "a BIGINT", 
"b STRING"));
 
         assertThatThrownBy(() -> spark.sql("ALTER TABLE testDropPartitionKey 
DROP COLUMN a"))
-                .isInstanceOf(RuntimeException.class)
-                .hasMessage(
-                        "java.lang.UnsupportedOperationException: Cannot 
drop/rename partition key[a]");
+                .satisfies(
+                        anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Cannot drop/rename partition key[a]"));
     }
 
     @Test
@@ -261,9 +263,10 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                 .contains(showCreateString("testDropPrimaryKey", "a BIGINT", 
"b STRING"));
 
         assertThatThrownBy(() -> spark.sql("ALTER TABLE testDropPrimaryKey 
DROP COLUMN b"))
-                .isInstanceOf(RuntimeException.class)
-                .hasMessage(
-                        "java.lang.UnsupportedOperationException: Cannot 
drop/rename primary key[b]");
+                .satisfies(
+                        anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Cannot drop/rename primary key[b]"));
     }
 
     @Test
@@ -294,7 +297,7 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
         createTable("tableFirstSelf");
         assertThatThrownBy(() -> spark.sql("ALTER TABLE tableFirstSelf ALTER 
COLUMN a FIRST"))
                 .satisfies(
-                        AssertionUtils.anyCauseMatches(
+                        anyCauseMatches(
                                 UnsupportedOperationException.class,
                                 "Cannot move itself for column a"));
 
@@ -302,7 +305,7 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
         createTable("tableAfterSelf");
         assertThatThrownBy(() -> spark.sql("ALTER TABLE tableAfterSelf ALTER 
COLUMN b AFTER b"))
                 .satisfies(
-                        AssertionUtils.anyCauseMatches(
+                        anyCauseMatches(
                                 UnsupportedOperationException.class,
                                 "Cannot move itself for column b"));
 
@@ -375,7 +378,7 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                                 spark.sql(
                                         "ALTER TABLE testAlterPkNullability 
ALTER COLUMN a DROP NOT NULL"))
                 .satisfies(
-                        AssertionUtils.anyCauseMatches(
+                        anyCauseMatches(
                                 UnsupportedOperationException.class,
                                 "Cannot change nullability of primary key"));
     }

Reply via email to