This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push: new 355b63535 [cdc] Renaming MySqlDatabaseSyncMode 355b63535 is described below commit 355b635351db2428a23a8a58f0ae6b131955ff17 Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Mon Jul 3 14:51:15 2023 +0800 [cdc] Renaming MySqlDatabaseSyncMode --- .../action/cdc/mysql/MySqlDatabaseSyncMode.java | 13 +++++++---- .../action/cdc/mysql/MySqlSyncDatabaseAction.java | 12 ++++++---- .../org/apache/paimon/flink/sink/FlinkSink.java | 2 +- .../sink/cdc/CdcRecordStoreMultiWriteOperator.java | 6 +++++ .../flink/sink/cdc/FlinkCdcMultiTableSink.java | 27 +++------------------- .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 13 +++++------ .../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 8 ++++--- 7 files changed, 36 insertions(+), 45 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java index 127c17c02..a7eccba66 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java @@ -21,11 +21,14 @@ package org.apache.paimon.flink.action.cdc.mysql; import java.io.Serializable; /** - * There are two modes for database sync. 1) STATIC mode, only write record from static tables. - * Newly added tables during runtime are not synced. 2) DYNAMIC mode, all records from static tables - * and newly added tables are routed into unified operator. + * There are two modes for database sync. + * + * <p>1) SEPARATE mode, start a sink for each table, the synchronization of the new table requires + * restarting the job. + * + * <p>2) UNIFIED mode, start a unified sink, the new table will be automatically synchronized. */ public enum MySqlDatabaseSyncMode implements Serializable { - STATIC, - DYNAMIC + SEPARATE, + UNIFIED } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index bc2abf91e..1fe314242 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -61,6 +61,8 @@ import java.util.stream.Collectors; import static org.apache.paimon.flink.action.Action.checkRequiredArgument; import static org.apache.paimon.flink.action.Action.optionalConfigMap; +import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.SEPARATE; +import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.UNIFIED; import static org.apache.paimon.utils.Preconditions.checkArgument; /** @@ -133,7 +135,7 @@ public class MySqlSyncDatabaseAction extends ActionBase { null, catalogConfig, tableConfig, - MySqlDatabaseSyncMode.STATIC); + SEPARATE); } public MySqlSyncDatabaseAction( @@ -225,7 +227,7 @@ public class MySqlSyncDatabaseAction extends ActionBase { + "MySQL database are not compatible with those of existed Paimon tables. Please check the log."); String tableList; - if (mode == MySqlDatabaseSyncMode.DYNAMIC) { + if (mode == UNIFIED) { // First excluding all tables that failed the excludingPattern and those does not // have a primary key. Then including other table using regex so that newly // added table DDLs and DMLs during job runtime will be captured @@ -377,10 +379,10 @@ public class MySqlSyncDatabaseAction extends ActionBase { String excludingTables = params.get("excluding-tables"); String mode = params.get("mode"); MySqlDatabaseSyncMode syncMode; - if ("dynamic".equalsIgnoreCase(mode)) { - syncMode = MySqlDatabaseSyncMode.DYNAMIC; + if ("unified".equalsIgnoreCase(mode)) { + syncMode = UNIFIED; } else { - syncMode = MySqlDatabaseSyncMode.STATIC; + syncMode = SEPARATE; } Map<String, String> mySqlConfig = optionalConfigMap(params, "mysql-conf"); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 94f7cf63b..3ad1cc0a7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -194,7 +194,7 @@ public abstract class FlinkSink<T> implements Serializable { return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1); } - private void assertStreamingConfiguration(StreamExecutionEnvironment env) { + public static void assertStreamingConfiguration(StreamExecutionEnvironment env) { checkArgument( !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(), "Paimon sink currently does not support unaligned checkpoints. Please set " diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java index 9ef826784..6b9f922ae 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java @@ -27,6 +27,7 @@ import org.apache.paimon.flink.sink.StateUtils; import org.apache.paimon.flink.sink.StoreSinkWrite; import org.apache.paimon.flink.sink.StoreSinkWriteState; import org.apache.paimon.options.Options; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.runtime.state.StateInitializationContext; @@ -159,6 +160,11 @@ public class CdcRecordStoreMultiWriteOperator Thread.sleep(RETRY_SLEEP_TIME.defaultValue().toMillis()); } } + + if (table.bucketMode() != BucketMode.FIXED) { + throw new UnsupportedOperationException( + "Unified Sink only supports FIXED bucket mode, but is " + table.bucketMode()); + } return table; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index 1291ce15b..c058225cb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -31,19 +31,13 @@ import org.apache.paimon.flink.sink.StoreMultiCommitter; import org.apache.paimon.flink.sink.StoreSinkWrite; import org.apache.paimon.flink.sink.StoreSinkWriteImpl; import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer; -import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.manifest.WrappedManifestCommittable; import org.apache.paimon.options.Options; -import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SerializableFunction; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.CheckpointConfig; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -51,6 +45,8 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import java.io.Serializable; import java.util.UUID; +import static org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration; + /** * A {@link FlinkSink} which accepts {@link CdcRecord} and waits for a schema change if necessary. */ @@ -89,11 +85,7 @@ public class FlinkCdcMultiTableSink implements Serializable { String commitUser, StoreSinkWrite.Provider sinkProvider) { StreamExecutionEnvironment env = input.getExecutionEnvironment(); - ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); - CheckpointConfig checkpointConfig = env.getCheckpointConfig(); - - assertCheckpointConfiguration(env); - + assertStreamingConfiguration(env); MultiTableCommittableTypeInfo typeInfo = new MultiTableCommittableTypeInfo(); SingleOutputStreamOperator<MultiTableCommittable> written = input.transform( @@ -116,19 +108,6 @@ public class FlinkCdcMultiTableSink implements Serializable { return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1); } - private void assertCheckpointConfiguration(StreamExecutionEnvironment env) { - Preconditions.checkArgument( - !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(), - "Paimon sink currently does not support unaligned checkpoints. Please set " - + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key() - + " to false."); - Preconditions.checkArgument( - env.getCheckpointConfig().getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE, - "Paimon sink currently only supports EXACTLY_ONCE checkpoint mode. Please set " - + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key() - + " to exactly-once"); - } - protected OneInputStreamOperator<CdcMultiplexRecord, MultiTableCommittable> createWriteOperator( StoreSinkWrite.Provider writeProvider, String commitUser) { return new CdcRecordStoreMultiWriteOperator( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index 9bf3f88a3..404fcbd87 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -29,7 +29,6 @@ import org.apache.paimon.utils.Preconditions; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import javax.annotation.Nullable; @@ -37,6 +36,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; +import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.UNIFIED; import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; /** @@ -94,15 +94,14 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> { Preconditions.checkNotNull(input); Preconditions.checkNotNull(parserFactory); - StreamExecutionEnvironment env = input.getExecutionEnvironment(); - if (mode == MySqlDatabaseSyncMode.DYNAMIC) { - buildDynamicCdcSink(env); + if (mode == UNIFIED) { + buildUnifiedCdcSink(); } else { - buildStaticCdcSink(env); + buildStaticCdcSink(); } } - private void buildDynamicCdcSink(StreamExecutionEnvironment env) { + private void buildUnifiedCdcSink() { SingleOutputStreamOperator<Void> parsed = input.forward() .process( @@ -145,7 +144,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> { new FlinkCdcSink(table).sinkFrom(partitioned); } - private void buildStaticCdcSink(StreamExecutionEnvironment env) { + private void buildStaticCdcSink() { SingleOutputStreamOperator<Void> parsed = input.forward() .process(new CdcMultiTableParsingProcessFunction<>(parserFactory)) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java index 44caea410..50b4611c4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java @@ -59,6 +59,8 @@ import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; +import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.SEPARATE; +import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.UNIFIED; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -372,7 +374,7 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase { null, Collections.emptyMap(), tableConfig, - MySqlDatabaseSyncMode.STATIC); + SEPARATE); action.build(env); JobClient client = env.executeAsync(); waitJobRunning(client); @@ -547,7 +549,7 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase { excludingTables, Collections.emptyMap(), tableConfig, - MySqlDatabaseSyncMode.STATIC); + SEPARATE); action.build(env); JobClient client = env.executeAsync(); waitJobRunning(client); @@ -907,7 +909,7 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase { null, Collections.emptyMap(), tableConfig, - MySqlDatabaseSyncMode.DYNAMIC); + UNIFIED); action.build(env); if (Objects.nonNull(savepointPath)) {