This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cc4d0835e0f flink-sql-iotdb-connector: add property cdc.mode in cdc 
connector (#11758)
cc4d0835e0f is described below

commit cc4d0835e0f7f64ebb60a50868fada010d2ecac2
Author: Xuan Ronaldo <[email protected]>
AuthorDate: Thu Dec 21 12:22:41 2023 +0800

    flink-sql-iotdb-connector: add property cdc.mode in cdc connector (#11758)
---
 .../org/apache/iotdb/flink/sql/common/Options.java |  7 ++++
 .../sql/factory/IoTDBDynamicTableFactory.java      |  1 +
 .../flink/sql/function/IoTDBCDCSourceFunction.java | 47 ++++++++++++++++------
 3 files changed, 42 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Options.java
 
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Options.java
index 138b97065e3..40b8757736e 100644
--- 
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Options.java
+++ 
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/common/Options.java
@@ -49,9 +49,16 @@ public class Options {
       ConfigOptions.key("sql").stringType().noDefaultValue();
   public static final ConfigOption<String> PATTERN =
       ConfigOptions.key("cdc.pattern").stringType().noDefaultValue();
+  public static final ConfigOption<CDCMode> CDC_MODE =
+      
ConfigOptions.key("cdc.mode").enumType(CDCMode.class).defaultValue(CDCMode.ALL);
 
   public enum Mode {
     CDC,
     BOUNDED
   }
+
+  public enum CDCMode {
+    REALTIME,
+    ALL
+  }
 }
diff --git 
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
 
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
index 705f3c0ac86..c32de37733e 100644
--- 
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
+++ 
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
@@ -94,6 +94,7 @@ public class IoTDBDynamicTableFactory
     optionalOptions.add(Options.CDC_PORT);
     optionalOptions.add(Options.SQL);
     optionalOptions.add(Options.PATTERN);
+    optionalOptions.add(Options.CDC_MODE);
 
     return optionalOptions;
   }
diff --git 
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
 
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
index d9b73f37534..58edd4d822f 100644
--- 
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
+++ 
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
@@ -70,6 +70,7 @@ public class IoTDBCDCSourceFunction extends 
RichSourceFunction<RowData> {
   private final BlockingQueue<TabletWrapper> tabletWrappers;
   private final List<Tuple2<String, DataType>> tableSchema;
   private final String pipeName;
+  private final Options.CDCMode mode;
   private transient ExecutorService consumeExecutor;
 
   public IoTDBCDCSourceFunction(ReadableConfig options, SchemaWrapper 
schemaWrapper) {
@@ -83,6 +84,7 @@ public class IoTDBCDCSourceFunction extends 
RichSourceFunction<RowData> {
     password = options.get(Options.PASSWORD);
     timeseriesList =
         tableSchema.stream().map(field -> 
String.valueOf(field.f0)).collect(Collectors.toList());
+    mode = options.get(Options.CDC_MODE);
 
     tabletWrappers = new ArrayBlockingQueue<>(nodeUrls.size() * 10);
   }
@@ -96,19 +98,38 @@ public class IoTDBCDCSourceFunction extends 
RichSourceFunction<RowData> {
     try (SessionDataSet dataSet =
         session.executeQueryStatement(String.format("show pipe %s", 
pipeName))) {
       if (!dataSet.hasNext()) {
-        String createPipeCommand =
-            String.format(
-                "CREATE PIPE %s\n"
-                    + "WITH EXTRACTOR (\n"
-                    + "'extractor' = 'iotdb-extractor',\n"
-                    + "'extractor.pattern' = '%s',\n"
-                    + ") WITH CONNECTOR (\n"
-                    + "'connector' = 'websocket-connector',\n"
-                    + "'connector.websocket.port' = '%d',\n"
-                    // avoid to reuse the pipe's connector
-                    + "'connector.websocket.id' = '%d'"
-                    + ")",
-                pipeName, pattern, cdcPort, System.currentTimeMillis());
+        String createPipeCommand;
+        if (Options.CDCMode.REALTIME.equals(mode)) {
+          createPipeCommand =
+              String.format(
+                  "CREATE PIPE %s\n"
+                      + "WITH EXTRACTOR (\n"
+                      + "'extractor' = 'iotdb-extractor',\n"
+                      + "'extractor.history.enable' = 'false',\n"
+                      + "'extractor.pattern' = '%s',\n"
+                      + ") WITH CONNECTOR (\n"
+                      + "'connector' = 'websocket-connector',\n"
+                      + "'connector.websocket.port' = '%d',\n"
+                      // avoid to reuse the pipe's connector
+                      + "'connector.websocket.id' = '%d'"
+                      + ")",
+                  pipeName, pattern, cdcPort, System.currentTimeMillis());
+        } else {
+          createPipeCommand =
+              String.format(
+                  "CREATE PIPE %s\n"
+                      + "WITH EXTRACTOR (\n"
+                      + "'extractor' = 'iotdb-extractor',\n"
+                      + "'extractor.pattern' = '%s',\n"
+                      + ") WITH CONNECTOR (\n"
+                      + "'connector' = 'websocket-connector',\n"
+                      + "'connector.websocket.port' = '%d',\n"
+                      // avoid to reuse the pipe's connector
+                      + "'connector.websocket.id' = '%d'"
+                      + ")",
+                  pipeName, pattern, cdcPort, System.currentTimeMillis());
+        }
+
         session.executeNonQueryStatement(createPipeCommand);
         session.executeNonQueryStatement(String.format("start pipe %s", 
pipeName));
       } else {

Reply via email to