This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 355b63535 [cdc] Renaming MySqlDatabaseSyncMode
355b63535 is described below

commit 355b635351db2428a23a8a58f0ae6b131955ff17
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Mon Jul 3 14:51:15 2023 +0800

    [cdc] Renaming MySqlDatabaseSyncMode
---
 .../action/cdc/mysql/MySqlDatabaseSyncMode.java    | 13 +++++++----
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  | 12 ++++++----
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  2 +-
 .../sink/cdc/CdcRecordStoreMultiWriteOperator.java |  6 +++++
 .../flink/sink/cdc/FlinkCdcMultiTableSink.java     | 27 +++-------------------
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  | 13 +++++------
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   |  8 ++++---
 7 files changed, 36 insertions(+), 45 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java
index 127c17c02..a7eccba66 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java
@@ -21,11 +21,14 @@ package org.apache.paimon.flink.action.cdc.mysql;
 import java.io.Serializable;
 
 /**
- * There are two modes for database sync. 1) STATIC mode, only write record 
from static tables.
- * Newly added tables during runtime are not synced. 2) DYNAMIC mode, all 
records from static tables
- * and newly added tables are routed into unified operator.
+ * There are two modes for database sync.
+ *
+ * <p>1) SEPARATE mode, start a sink for each table, the synchronization of 
the new table requires
+ * restarting the job.
+ *
+ * <p>2) UNIFIED mode, start a unified sink, the new table will be 
automatically synchronized.
  */
 public enum MySqlDatabaseSyncMode implements Serializable {
-    STATIC,
-    DYNAMIC
+    SEPARATE,
+    UNIFIED
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index bc2abf91e..1fe314242 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -61,6 +61,8 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.flink.action.Action.checkRequiredArgument;
 import static org.apache.paimon.flink.action.Action.optionalConfigMap;
+import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.SEPARATE;
+import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.UNIFIED;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -133,7 +135,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                 null,
                 catalogConfig,
                 tableConfig,
-                MySqlDatabaseSyncMode.STATIC);
+                SEPARATE);
     }
 
     public MySqlSyncDatabaseAction(
@@ -225,7 +227,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                         + "MySQL database are not compatible with those of 
existed Paimon tables. Please check the log.");
         String tableList;
 
-        if (mode == MySqlDatabaseSyncMode.DYNAMIC) {
+        if (mode == UNIFIED) {
             // First excluding all tables that failed the excludingPattern and 
those does not
             //     have a primary key. Then including other table using regex 
so that newly
             //     added table DDLs and DMLs during job runtime will be 
captured
@@ -377,10 +379,10 @@ public class MySqlSyncDatabaseAction extends ActionBase {
         String excludingTables = params.get("excluding-tables");
         String mode = params.get("mode");
         MySqlDatabaseSyncMode syncMode;
-        if ("dynamic".equalsIgnoreCase(mode)) {
-            syncMode = MySqlDatabaseSyncMode.DYNAMIC;
+        if ("unified".equalsIgnoreCase(mode)) {
+            syncMode = UNIFIED;
         } else {
-            syncMode = MySqlDatabaseSyncMode.STATIC;
+            syncMode = SEPARATE;
         }
 
         Map<String, String> mySqlConfig = optionalConfigMap(params, 
"mysql-conf");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 94f7cf63b..3ad1cc0a7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -194,7 +194,7 @@ public abstract class FlinkSink<T> implements Serializable {
         return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
     }
 
-    private void assertStreamingConfiguration(StreamExecutionEnvironment env) {
+    public static void assertStreamingConfiguration(StreamExecutionEnvironment 
env) {
         checkArgument(
                 !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
                 "Paimon sink currently does not support unaligned checkpoints. 
Please set "
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index 9ef826784..6b9f922ae 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -27,6 +27,7 @@ import org.apache.paimon.flink.sink.StateUtils;
 import org.apache.paimon.flink.sink.StoreSinkWrite;
 import org.apache.paimon.flink.sink.StoreSinkWriteState;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -159,6 +160,11 @@ public class CdcRecordStoreMultiWriteOperator
                 Thread.sleep(RETRY_SLEEP_TIME.defaultValue().toMillis());
             }
         }
+
+        if (table.bucketMode() != BucketMode.FIXED) {
+            throw new UnsupportedOperationException(
+                    "Unified Sink only supports FIXED bucket mode, but is " + 
table.bucketMode());
+        }
         return table;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index 1291ce15b..c058225cb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -31,19 +31,13 @@ import org.apache.paimon.flink.sink.StoreMultiCommitter;
 import org.apache.paimon.flink.sink.StoreSinkWrite;
 import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
 import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.manifest.WrappedManifestCommittable;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SerializableFunction;
 
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -51,6 +45,8 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import java.io.Serializable;
 import java.util.UUID;
 
+import static 
org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration;
+
 /**
  * A {@link FlinkSink} which accepts {@link CdcRecord} and waits for a schema 
change if necessary.
  */
@@ -89,11 +85,7 @@ public class FlinkCdcMultiTableSink implements Serializable {
             String commitUser,
             StoreSinkWrite.Provider sinkProvider) {
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
-        ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
-        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
-
-        assertCheckpointConfiguration(env);
-
+        assertStreamingConfiguration(env);
         MultiTableCommittableTypeInfo typeInfo = new 
MultiTableCommittableTypeInfo();
         SingleOutputStreamOperator<MultiTableCommittable> written =
                 input.transform(
@@ -116,19 +108,6 @@ public class FlinkCdcMultiTableSink implements 
Serializable {
         return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
     }
 
-    private void assertCheckpointConfiguration(StreamExecutionEnvironment env) 
{
-        Preconditions.checkArgument(
-                !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
-                "Paimon sink currently does not support unaligned checkpoints. 
Please set "
-                        + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key()
-                        + " to false.");
-        Preconditions.checkArgument(
-                env.getCheckpointConfig().getCheckpointingMode() == 
CheckpointingMode.EXACTLY_ONCE,
-                "Paimon sink currently only supports EXACTLY_ONCE checkpoint 
mode. Please set "
-                        + 
ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()
-                        + " to exactly-once");
-    }
-
     protected OneInputStreamOperator<CdcMultiplexRecord, 
MultiTableCommittable> createWriteOperator(
             StoreSinkWrite.Provider writeProvider, String commitUser) {
         return new CdcRecordStoreMultiWriteOperator(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index 9bf3f88a3..404fcbd87 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -29,7 +29,6 @@ import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 
 import javax.annotation.Nullable;
@@ -37,6 +36,7 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.UNIFIED;
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
 
 /**
@@ -94,15 +94,14 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         Preconditions.checkNotNull(input);
         Preconditions.checkNotNull(parserFactory);
 
-        StreamExecutionEnvironment env = input.getExecutionEnvironment();
-        if (mode == MySqlDatabaseSyncMode.DYNAMIC) {
-            buildDynamicCdcSink(env);
+        if (mode == UNIFIED) {
+            buildUnifiedCdcSink();
         } else {
-            buildStaticCdcSink(env);
+            buildStaticCdcSink();
         }
     }
 
-    private void buildDynamicCdcSink(StreamExecutionEnvironment env) {
+    private void buildUnifiedCdcSink() {
         SingleOutputStreamOperator<Void> parsed =
                 input.forward()
                         .process(
@@ -145,7 +144,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         new FlinkCdcSink(table).sinkFrom(partitioned);
     }
 
-    private void buildStaticCdcSink(StreamExecutionEnvironment env) {
+    private void buildStaticCdcSink() {
         SingleOutputStreamOperator<Void> parsed =
                 input.forward()
                         .process(new 
CdcMultiTableParsingProcessFunction<>(parserFactory))
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 44caea410..50b4611c4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -59,6 +59,8 @@ import java.util.Objects;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.SEPARATE;
+import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.UNIFIED;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -372,7 +374,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         null,
                         Collections.emptyMap(),
                         tableConfig,
-                        MySqlDatabaseSyncMode.STATIC);
+                        SEPARATE);
         action.build(env);
         JobClient client = env.executeAsync();
         waitJobRunning(client);
@@ -547,7 +549,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         excludingTables,
                         Collections.emptyMap(),
                         tableConfig,
-                        MySqlDatabaseSyncMode.STATIC);
+                        SEPARATE);
         action.build(env);
         JobClient client = env.executeAsync();
         waitJobRunning(client);
@@ -907,7 +909,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         null,
                         Collections.emptyMap(),
                         tableConfig,
-                        MySqlDatabaseSyncMode.DYNAMIC);
+                        UNIFIED);
         action.build(env);
 
         if (Objects.nonNull(savepointPath)) {

Reply via email to