This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cc4d0835e0f flink-sql-iotdb-connector: add property cdc.mode in cdc
connector (#11758)
cc4d0835e0f is described below
commit cc4d0835e0f7f64ebb60a50868fada010d2ecac2
Author: Xuan Ronaldo <[email protected]>
AuthorDate: Thu Dec 21 12:22:41 2023 +0800
flink-sql-iotdb-connector: add property cdc.mode in cdc connector (#11758)
---
.../org/apache/iotdb/flink/sql/common/Options.java | 7 ++++
.../sql/factory/IoTDBDynamicTableFactory.java | 1 +
.../flink/sql/function/IoTDBCDCSourceFunction.java | 47 ++++++++++++++++------
3 files changed, 42 insertions(+), 13 deletions(-)
diff --git
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Options.java
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Options.java
index 138b97065e3..40b8757736e 100644
---
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Options.java
+++
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Options.java
@@ -49,9 +49,16 @@ public class Options {
ConfigOptions.key("sql").stringType().noDefaultValue();
public static final ConfigOption<String> PATTERN =
ConfigOptions.key("cdc.pattern").stringType().noDefaultValue();
+ public static final ConfigOption<CDCMode> CDC_MODE =
+
ConfigOptions.key("cdc.mode").enumType(CDCMode.class).defaultValue(CDCMode.ALL);
public enum Mode {
CDC,
BOUNDED
}
+
+ public enum CDCMode {
+ REALTIME,
+ ALL
+ }
}
diff --git
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
index 705f3c0ac86..c32de37733e 100644
---
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
+++
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
@@ -94,6 +94,7 @@ public class IoTDBDynamicTableFactory
optionalOptions.add(Options.CDC_PORT);
optionalOptions.add(Options.SQL);
optionalOptions.add(Options.PATTERN);
+ optionalOptions.add(Options.CDC_MODE);
return optionalOptions;
}
diff --git
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
index d9b73f37534..58edd4d822f 100644
---
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
+++
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
@@ -70,6 +70,7 @@ public class IoTDBCDCSourceFunction extends
RichSourceFunction<RowData> {
private final BlockingQueue<TabletWrapper> tabletWrappers;
private final List<Tuple2<String, DataType>> tableSchema;
private final String pipeName;
+ private final Options.CDCMode mode;
private transient ExecutorService consumeExecutor;
public IoTDBCDCSourceFunction(ReadableConfig options, SchemaWrapper
schemaWrapper) {
@@ -83,6 +84,7 @@ public class IoTDBCDCSourceFunction extends
RichSourceFunction<RowData> {
password = options.get(Options.PASSWORD);
timeseriesList =
tableSchema.stream().map(field ->
String.valueOf(field.f0)).collect(Collectors.toList());
+ mode = options.get(Options.CDC_MODE);
tabletWrappers = new ArrayBlockingQueue<>(nodeUrls.size() * 10);
}
@@ -96,19 +98,38 @@ public class IoTDBCDCSourceFunction extends
RichSourceFunction<RowData> {
try (SessionDataSet dataSet =
session.executeQueryStatement(String.format("show pipe %s",
pipeName))) {
if (!dataSet.hasNext()) {
- String createPipeCommand =
- String.format(
- "CREATE PIPE %s\n"
- + "WITH EXTRACTOR (\n"
- + "'extractor' = 'iotdb-extractor',\n"
- + "'extractor.pattern' = '%s',\n"
- + ") WITH CONNECTOR (\n"
- + "'connector' = 'websocket-connector',\n"
- + "'connector.websocket.port' = '%d',\n"
- // avoid to reuse the pipe's connector
- + "'connector.websocket.id' = '%d'"
- + ")",
- pipeName, pattern, cdcPort, System.currentTimeMillis());
+ String createPipeCommand;
+ if (Options.CDCMode.REALTIME.equals(mode)) {
+ createPipeCommand =
+ String.format(
+ "CREATE PIPE %s\n"
+ + "WITH EXTRACTOR (\n"
+ + "'extractor' = 'iotdb-extractor',\n"
+ + "'extractor.history.enable' = 'false',\n"
+ + "'extractor.pattern' = '%s',\n"
+ + ") WITH CONNECTOR (\n"
+ + "'connector' = 'websocket-connector',\n"
+ + "'connector.websocket.port' = '%d',\n"
+ // avoid to reuse the pipe's connector
+ + "'connector.websocket.id' = '%d'"
+ + ")",
+ pipeName, pattern, cdcPort, System.currentTimeMillis());
+ } else {
+ createPipeCommand =
+ String.format(
+ "CREATE PIPE %s\n"
+ + "WITH EXTRACTOR (\n"
+ + "'extractor' = 'iotdb-extractor',\n"
+ + "'extractor.pattern' = '%s',\n"
+ + ") WITH CONNECTOR (\n"
+ + "'connector' = 'websocket-connector',\n"
+ + "'connector.websocket.port' = '%d',\n"
+ // avoid to reuse the pipe's connector
+ + "'connector.websocket.id' = '%d'"
+ + ")",
+ pipeName, pattern, cdcPort, System.currentTimeMillis());
+ }
+
session.executeNonQueryStatement(createPipeCommand);
session.executeNonQueryStatement(String.format("start pipe %s",
pipeName));
} else {