This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8538308e7c9 Pipe: Modify 
extractor.history.terminate-pipe-on-all-consumed -> extractor.mode. Make 
sink.realtime-first true by default. (#12605)
8538308e7c9 is described below

commit 8538308e7c9785aa700a8c7934d6f523a6ac8be6
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed May 29 17:43:27 2024 +0800

    Pipe: Modify extractor.history.terminate-pipe-on-all-consumed -> 
extractor.mode. Make sink.realtime-first true by default. (#12605)
---
 .../pipe/it/autocreate/IoTDBPipeAutoDropIT.java    |  2 +-
 .../pipe/it/autocreate/IoTDBPipeClusterIT.java     |  3 ++-
 .../pipe/it/autocreate/IoTDBPipeDataSinkIT.java    |  4 ++--
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 27 +++++++++-------------
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 16 ++++++-------
 .../config/constant/PipeConnectorConstant.java     |  2 +-
 .../config/constant/PipeExtractorConstant.java     | 12 +++++-----
 7 files changed, 31 insertions(+), 35 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
index b223a245d0a..ecd0b2d5775 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
@@ -53,7 +53,7 @@ public class IoTDBPipeAutoDropIT extends 
AbstractPipeDualAutoIT {
       final Map<String, String> processorAttributes = new HashMap<>();
       final Map<String, String> connectorAttributes = new HashMap<>();
 
-      
extractorAttributes.put("extractor.history.terminate-pipe-on-all-consumed", 
"true");
+      extractorAttributes.put("extractor.mode", "query");
 
       connectorAttributes.put("connector", "iotdb-thrift-connector");
       connectorAttributes.put("connector.batch.enable", "false");
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index 7ebf56c6035..aeab3f74463 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -917,7 +917,8 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
           Arrays.asList(
               // Test the correctness of insertRowsNode transmission
               "insert into root.db.d1(time, s1) values (-122, 3)",
-              "insert into root.db.d1(time, s1) values (-123, 3), (now(), 
3)"))) {
+              "insert into root.db.d1(time, s1) values (-123, 3), (now(), 3)",
+              "flush"))) {
         return;
       }
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index d3bfe710aca..50a9b0d2ec5 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -43,7 +43,7 @@ import java.util.Map;
 @Category({MultiClusterIT2AutoCreateSchema.class})
 public class IoTDBPipeDataSinkIT extends AbstractPipeDualAutoIT {
   @Test
-  public void testThriftConnectorWithRealtimeFirst() throws Exception {
+  public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     final String receiverIp = receiverDataNode.getIp();
@@ -70,7 +70,7 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualAutoIT {
       connectorAttributes.put("connector.batch.enable", "false");
       connectorAttributes.put("connector.ip", receiverIp);
       connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
-      connectorAttributes.put("connector.realtime-first", "true");
+      connectorAttributes.put("connector.realtime-first", "false");
 
       final TSStatus status =
           client.createPipe(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 9f3b7fe66ee..098c5b68cee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
 import org.apache.iotdb.commons.pipe.task.PipeTask;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -81,10 +82,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
-
 public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeDataNodeTaskAgent.class);
@@ -309,8 +306,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                 || pipeTaskMap.entrySet().stream()
                     .filter(entry -> dataRegionIds.contains(entry.getKey()))
                     .allMatch(entry -> ((PipeDataNodeTask) 
entry.getValue()).isCompleted());
-        // If the "source.history.terminate-pipe-on-all-consumed" is false or 
the pipe does
-        // not include data transfer, we should not terminate the pipe.
         final boolean includeDataAndNeedDrop =
             
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
                         pipeMeta.getStaticMeta().getExtractorParameters())
@@ -318,11 +313,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                 && pipeMeta
                     .getStaticMeta()
                     .getExtractorParameters()
-                    .getBooleanOrDefault(
+                    .getStringOrDefault(
                         Arrays.asList(
-                            SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY,
-                            
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
-                        
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
+                            PipeExtractorConstant.EXTRACTOR_MODE_KEY,
+                            PipeExtractorConstant.SOURCE_MODE_KEY),
+                        PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE)
+                    
.equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE);
 
         final boolean isCompleted = isAllDataRegionCompleted && 
includeDataAndNeedDrop;
         final Pair<Long, Double> remainingEventAndTime =
@@ -389,8 +385,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                 || pipeTaskMap.entrySet().stream()
                     .filter(entry -> dataRegionIds.contains(entry.getKey()))
                     .allMatch(entry -> ((PipeDataNodeTask) 
entry.getValue()).isCompleted());
-        // If the "source.history.terminate-pipe-on-all-consumed" is false or 
the pipe does
-        // not include data transfer, we should not terminate the pipe.
         final boolean includeDataAndNeedDrop =
             
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
                         pipeMeta.getStaticMeta().getExtractorParameters())
@@ -398,11 +392,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                 && pipeMeta
                     .getStaticMeta()
                     .getExtractorParameters()
-                    .getBooleanOrDefault(
+                    .getStringOrDefault(
                         Arrays.asList(
-                            SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY,
-                            
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
-                        
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
+                            PipeExtractorConstant.EXTRACTOR_MODE_KEY,
+                            PipeExtractorConstant.SOURCE_MODE_KEY),
+                        PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE)
+                    
.equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE);
 
         final boolean isCompleted = isAllDataRegionCompleted && 
includeDataAndNeedDrop;
         final Pair<Long, Double> remainingEventAndTime =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 880d3f9f7f2..1c150dd5409 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
 import 
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
@@ -65,8 +66,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
@@ -75,7 +74,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
 
@@ -197,11 +195,13 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                       .getRight());
 
       shouldTerminatePipeOnAllHistoricalEventsConsumed =
-          parameters.getBooleanOrDefault(
-              Arrays.asList(
-                  SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY,
-                  EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
-              EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
+          parameters
+              .getStringOrDefault(
+                  Arrays.asList(
+                      PipeExtractorConstant.EXTRACTOR_MODE_KEY,
+                      PipeExtractorConstant.SOURCE_MODE_KEY),
+                  PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE)
+              
.equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE);
     } catch (final Exception e) {
       // Compatible with the current validation framework
       throw new PipeParameterNotValidException(e.getMessage());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 5e73c4d54d8..a7da7c3ef68 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -55,7 +55,7 @@ public class PipeConnectorConstant {
 
   public static final String CONNECTOR_REALTIME_FIRST_KEY = 
"connector.realtime-first";
   public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first";
-  public static final boolean CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE = false;
+  public static final boolean CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE = true;
 
   public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = 
"connector.batch.enable";
   public static final String SINK_IOTDB_BATCH_MODE_ENABLE_KEY = 
"sink.batch.enable";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index b5ee9c153dc..a01c24d3aec 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -32,6 +32,12 @@ public class PipeExtractorConstant {
   public static final String SOURCE_EXCLUSION_KEY = 
"source.inclusion.exclusion";
   public static final String EXTRACTOR_EXCLUSION_DEFAULT_VALUE = "";
 
+  public static final String EXTRACTOR_MODE_KEY = "extractor.mode";
+  public static final String SOURCE_MODE_KEY = "source.mode";
+  public static final String EXTRACTOR_MODE_QUERY_VALUE = "query";
+  public static final String EXTRACTOR_MODE_SUBSCRIBE_VALUE = "subscribe";
+  public static final String EXTRACTOR_MODE_DEFAULT_VALUE = 
EXTRACTOR_MODE_SUBSCRIBE_VALUE;
+
   public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern";
   public static final String SOURCE_PATTERN_KEY = "source.pattern";
   public static final String EXTRACTOR_PATH_KEY = "extractor.path";
@@ -61,12 +67,6 @@ public class PipeExtractorConstant {
   public static final String EXTRACTOR_MODS_ENABLE_KEY = 
"extractor.mods.enable";
   public static final String SOURCE_MODS_ENABLE_KEY = "source.mods.enable";
   public static final boolean EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE = false;
-  public static final String 
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY =
-      "extractor.history.terminate-pipe-on-all-consumed";
-  public static final String SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY 
=
-      "source.history.terminate-pipe-on-all-consumed";
-  public static final boolean 
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE =
-      false;
 
   public static final String EXTRACTOR_REALTIME_ENABLE_KEY = 
"extractor.realtime.enable";
   public static final String SOURCE_REALTIME_ENABLE_KEY = 
"source.realtime.enable";

Reply via email to