This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d6a0a088b [flink] Change the return value of buildSource in 
SyncTableActionBase (#2342)
d6a0a088b is described below

commit d6a0a088b8898ff671c4a043364f219b466980ae
Author: Xiaojian Sun <[email protected]>
AuthorDate: Mon Nov 20 07:46:49 2023 +0800

    [flink] Change the return value of buildSource in SyncTableActionBase 
(#2342)
---
 .../paimon/flink/action/cdc/SyncTableActionBase.java  | 19 +++++++++++++++----
 .../flink/action/cdc/kafka/KafkaSyncTableAction.java  |  6 +++---
 .../action/cdc/mongodb/MongoDBSyncTableAction.java    |  7 ++++---
 .../flink/action/cdc/mysql/MySqlSyncTableAction.java  |  6 +++---
 .../action/cdc/pulsar/PulsarSyncTableAction.java      |  6 +++---
 5 files changed, 28 insertions(+), 16 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index a86534631..82510f15a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -36,6 +36,8 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -147,7 +149,7 @@ public abstract class SyncTableActionBase extends 
ActionBase {
                 true);
     }
 
-    protected abstract Source<String, ?, ?> buildSource() throws Exception;
+    protected abstract DataStreamSource<String> buildSource() throws Exception;
 
     protected abstract String sourceName();
 
@@ -194,9 +196,7 @@ public abstract class SyncTableActionBase extends 
ActionBase {
         checkComputedColumns(computedColumns);
 
         DataStream<RichCdcMultiplexRecord> input =
-                env.fromSource(buildSource(), 
WatermarkStrategy.noWatermarks(), sourceName())
-                        .flatMap(recordParse())
-                        .name("Parse");
+                buildSource().flatMap(recordParse()).name("Parse");
         EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
                 () -> new RichCdcMultiplexRecordEventParser(caseSensitive);
 
@@ -214,6 +214,17 @@ public abstract class SyncTableActionBase extends 
ActionBase {
         sinkBuilder.build();
     }
 
+    protected DataStreamSource<String> buildDataStreamSource(Object source) {
+        if (source instanceof Source) {
+            return env.fromSource(
+                    (Source<String, ?, ?>) source, 
WatermarkStrategy.noWatermarks(), sourceName());
+        }
+        if (source instanceof SourceFunction) {
+            return env.addSource((SourceFunction<String>) source, 
sourceName());
+        }
+        throw new UnsupportedOperationException("Unrecognized source type");
+    }
+
     protected void validateCaseInsensitive(boolean caseSensitive) {
         AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database", 
database);
         AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table", table);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index 57e471b80..0e90bda54 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -22,7 +22,7 @@ import 
org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
 import org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase;
 import org.apache.paimon.flink.action.cdc.format.DataFormat;
 
-import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
 
 import java.util.Map;
@@ -40,8 +40,8 @@ public class KafkaSyncTableAction extends 
MessageQueueSyncTableActionBase {
     }
 
     @Override
-    protected Source<String, ?, ?> buildSource() {
-        return KafkaActionUtils.buildKafkaSource(cdcSourceConfig);
+    protected DataStreamSource<String> buildSource() {
+        return 
buildDataStreamSource(KafkaActionUtils.buildKafkaSource(cdcSourceConfig));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index d88a7ff95..08aacf9cc 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -24,7 +24,7 @@ import org.apache.paimon.schema.Schema;
 
 import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 
 import java.util.Map;
 
@@ -76,12 +76,13 @@ public class MongoDBSyncTableAction extends 
SyncTableActionBase {
     }
 
     @Override
-    protected Source<String, ?, ?> buildSource() throws Exception {
+    protected DataStreamSource<String> buildSource() throws Exception {
         String tableList =
                 cdcSourceConfig.get(MongoDBSourceOptions.DATABASE)
                         + "\\."
                         + cdcSourceConfig.get(MongoDBSourceOptions.COLLECTION);
-        return MongoDBActionUtils.buildMongodbSource(cdcSourceConfig, 
tableList);
+        return buildDataStreamSource(
+                MongoDBActionUtils.buildMongodbSource(cdcSourceConfig, 
tableList));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index f3d3aec4b..4d0f05664 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -29,7 +29,7 @@ import org.apache.paimon.schema.Schema;
 
 import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -114,12 +114,12 @@ public class MySqlSyncTableAction extends 
SyncTableActionBase {
     }
 
     @Override
-    protected Source<String, ?, ?> buildSource() throws Exception {
+    protected DataStreamSource<String> buildSource() throws Exception {
         String tableList =
                 mySqlSchemasInfo.pkTables().stream()
                         .map(i -> i.getDatabaseName() + "\\." + 
i.getObjectName())
                         .collect(Collectors.joining("|"));
-        return MySqlActionUtils.buildMySqlSource(cdcSourceConfig, tableList);
+        return 
buildDataStreamSource(MySqlActionUtils.buildMySqlSource(cdcSourceConfig, 
tableList));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java
index aa3908731..d21713828 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java
@@ -22,7 +22,7 @@ import 
org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
 import org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase;
 import org.apache.paimon.flink.action.cdc.format.DataFormat;
 
-import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.pulsar.client.api.PulsarClientException;
 
 import java.util.Map;
@@ -40,8 +40,8 @@ public class PulsarSyncTableAction extends 
MessageQueueSyncTableActionBase {
     }
 
     @Override
-    protected Source<String, ?, ?> buildSource() {
-        return PulsarActionUtils.buildPulsarSource(cdcSourceConfig);
+    protected DataStreamSource<String> buildSource() {
+        return 
buildDataStreamSource(PulsarActionUtils.buildPulsarSource(cdcSourceConfig));
     }
 
     @Override

Reply via email to