This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d6a0a088b [flink] Change the return value of buildSource in
SyncTableActionBase (#2342)
d6a0a088b is described below
commit d6a0a088b8898ff671c4a043364f219b466980ae
Author: Xiaojian Sun <[email protected]>
AuthorDate: Mon Nov 20 07:46:49 2023 +0800
[flink] Change the return value of buildSource in SyncTableActionBase
(#2342)
---
.../paimon/flink/action/cdc/SyncTableActionBase.java | 19 +++++++++++++++----
.../flink/action/cdc/kafka/KafkaSyncTableAction.java | 6 +++---
.../action/cdc/mongodb/MongoDBSyncTableAction.java | 7 ++++---
.../flink/action/cdc/mysql/MySqlSyncTableAction.java | 6 +++---
.../action/cdc/pulsar/PulsarSyncTableAction.java | 6 +++---
5 files changed, 28 insertions(+), 16 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index a86534631..82510f15a 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -36,6 +36,8 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -147,7 +149,7 @@ public abstract class SyncTableActionBase extends
ActionBase {
true);
}
- protected abstract Source<String, ?, ?> buildSource() throws Exception;
+ protected abstract DataStreamSource<String> buildSource() throws Exception;
protected abstract String sourceName();
@@ -194,9 +196,7 @@ public abstract class SyncTableActionBase extends
ActionBase {
checkComputedColumns(computedColumns);
DataStream<RichCdcMultiplexRecord> input =
- env.fromSource(buildSource(),
WatermarkStrategy.noWatermarks(), sourceName())
- .flatMap(recordParse())
- .name("Parse");
+ buildSource().flatMap(recordParse()).name("Parse");
EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
() -> new RichCdcMultiplexRecordEventParser(caseSensitive);
@@ -214,6 +214,17 @@ public abstract class SyncTableActionBase extends
ActionBase {
sinkBuilder.build();
}
+ protected DataStreamSource<String> buildDataStreamSource(Object source) {
+ if (source instanceof Source) {
+ return env.fromSource(
+ (Source<String, ?, ?>) source,
WatermarkStrategy.noWatermarks(), sourceName());
+ }
+ if (source instanceof SourceFunction) {
+ return env.addSource((SourceFunction<String>) source,
sourceName());
+ }
+ throw new UnsupportedOperationException("Unrecognized source type");
+ }
+
protected void validateCaseInsensitive(boolean caseSensitive) {
AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database",
database);
AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table", table);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index 57e471b80..0e90bda54 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -22,7 +22,7 @@ import
org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
-import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import java.util.Map;
@@ -40,8 +40,8 @@ public class KafkaSyncTableAction extends
MessageQueueSyncTableActionBase {
}
@Override
- protected Source<String, ?, ?> buildSource() {
- return KafkaActionUtils.buildKafkaSource(cdcSourceConfig);
+ protected DataStreamSource<String> buildSource() {
+ return
buildDataStreamSource(KafkaActionUtils.buildKafkaSource(cdcSourceConfig));
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index d88a7ff95..08aacf9cc 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -24,7 +24,7 @@ import org.apache.paimon.schema.Schema;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import java.util.Map;
@@ -76,12 +76,13 @@ public class MongoDBSyncTableAction extends
SyncTableActionBase {
}
@Override
- protected Source<String, ?, ?> buildSource() throws Exception {
+ protected DataStreamSource<String> buildSource() throws Exception {
String tableList =
cdcSourceConfig.get(MongoDBSourceOptions.DATABASE)
+ "\\."
+ cdcSourceConfig.get(MongoDBSourceOptions.COLLECTION);
- return MongoDBActionUtils.buildMongodbSource(cdcSourceConfig,
tableList);
+ return buildDataStreamSource(
+ MongoDBActionUtils.buildMongodbSource(cdcSourceConfig,
tableList));
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index f3d3aec4b..4d0f05664 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -29,7 +29,7 @@ import org.apache.paimon.schema.Schema;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import java.util.ArrayList;
import java.util.List;
@@ -114,12 +114,12 @@ public class MySqlSyncTableAction extends
SyncTableActionBase {
}
@Override
- protected Source<String, ?, ?> buildSource() throws Exception {
+ protected DataStreamSource<String> buildSource() throws Exception {
String tableList =
mySqlSchemasInfo.pkTables().stream()
.map(i -> i.getDatabaseName() + "\\." +
i.getObjectName())
.collect(Collectors.joining("|"));
- return MySqlActionUtils.buildMySqlSource(cdcSourceConfig, tableList);
+ return
buildDataStreamSource(MySqlActionUtils.buildMySqlSource(cdcSourceConfig,
tableList));
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java
index aa3908731..d21713828 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java
@@ -22,7 +22,7 @@ import
org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
-import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.Map;
@@ -40,8 +40,8 @@ public class PulsarSyncTableAction extends
MessageQueueSyncTableActionBase {
}
@Override
- protected Source<String, ?, ?> buildSource() {
- return PulsarActionUtils.buildPulsarSource(cdcSourceConfig);
+ protected DataStreamSource<String> buildSource() {
+ return
buildDataStreamSource(PulsarActionUtils.buildPulsarSource(cdcSourceConfig));
}
@Override