This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-param-modification
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit da3bfa1542ba15cae0457202813215e8d0abe859
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue May 28 18:05:11 2024 +0800

    extractor.history.terminate-pipe-on-all-consumed -> extract.mode
---
 .../pipe/it/autocreate/IoTDBPipeAutoDropIT.java    |  2 +-
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 27 +++++++++-------------
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 16 ++++++-------
 .../config/constant/PipeExtractorConstant.java     | 12 +++++-----
 4 files changed, 26 insertions(+), 31 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
index b223a245d0a..ecd0b2d5775 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
@@ -53,7 +53,7 @@ public class IoTDBPipeAutoDropIT extends 
AbstractPipeDualAutoIT {
       final Map<String, String> processorAttributes = new HashMap<>();
       final Map<String, String> connectorAttributes = new HashMap<>();
 
-      
extractorAttributes.put("extractor.history.terminate-pipe-on-all-consumed", 
"true");
+      extractorAttributes.put("extractor.mode", "query");
 
       connectorAttributes.put("connector", "iotdb-thrift-connector");
       connectorAttributes.put("connector.batch.enable", "false");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 65c7699f398..112d91ff953 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
 import org.apache.iotdb.commons.pipe.task.PipeTask;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -80,10 +81,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
-
 public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeDataNodeTaskAgent.class);
@@ -305,8 +302,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                 || pipeTaskMap.entrySet().stream()
                     .filter(entry -> dataRegionIds.contains(entry.getKey()))
                     .allMatch(entry -> ((PipeDataNodeTask) 
entry.getValue()).isCompleted());
-        // If the "source.history.terminate-pipe-on-all-consumed" is false or 
the pipe does
-        // not include data transfer, we should not terminate the pipe.
         final boolean includeDataAndNeedDrop =
             
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
                         pipeMeta.getStaticMeta().getExtractorParameters())
@@ -314,11 +309,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                 && pipeMeta
                     .getStaticMeta()
                     .getExtractorParameters()
-                    .getBooleanOrDefault(
+                    .getStringOrDefault(
                         Arrays.asList(
-                            SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY,
-                            
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
-                        
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
+                            PipeExtractorConstant.EXTRACTOR_MODE_KEY,
+                            PipeExtractorConstant.SOURCE_MODE_KEY),
+                        PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE)
+                    
.equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE);
 
         pipeCompletedList.add(isAllDataRegionCompleted && 
includeDataAndNeedDrop);
 
@@ -371,8 +367,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                 || pipeTaskMap.entrySet().stream()
                     .filter(entry -> dataRegionIds.contains(entry.getKey()))
                     .allMatch(entry -> ((PipeDataNodeTask) 
entry.getValue()).isCompleted());
-        // If the "source.history.terminate-pipe-on-all-consumed" is false or 
the pipe does
-        // not include data transfer, we should not terminate the pipe.
         final boolean includeDataAndNeedDrop =
             
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
                         pipeMeta.getStaticMeta().getExtractorParameters())
@@ -380,11 +374,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                 && pipeMeta
                     .getStaticMeta()
                     .getExtractorParameters()
-                    .getBooleanOrDefault(
+                    .getStringOrDefault(
                         Arrays.asList(
-                            SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY,
-                            
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
-                        
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
+                            PipeExtractorConstant.EXTRACTOR_MODE_KEY,
+                            PipeExtractorConstant.SOURCE_MODE_KEY),
+                        PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE)
+                    
.equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE);
 
         pipeCompletedList.add(isAllDataRegionCompleted && 
includeDataAndNeedDrop);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 880d3f9f7f2..1c150dd5409 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
 import 
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
@@ -65,8 +66,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
@@ -75,7 +74,6 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
 
@@ -197,11 +195,13 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                       .getRight());
 
       shouldTerminatePipeOnAllHistoricalEventsConsumed =
-          parameters.getBooleanOrDefault(
-              Arrays.asList(
-                  SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY,
-                  EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
-              EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
+          parameters
+              .getStringOrDefault(
+                  Arrays.asList(
+                      PipeExtractorConstant.EXTRACTOR_MODE_KEY,
+                      PipeExtractorConstant.SOURCE_MODE_KEY),
+                  PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE)
+              
.equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE);
     } catch (final Exception e) {
       // Compatible with the current validation framework
       throw new PipeParameterNotValidException(e.getMessage());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index b5ee9c153dc..c7d9aca95c5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -32,6 +32,12 @@ public class PipeExtractorConstant {
   public static final String SOURCE_EXCLUSION_KEY = 
"source.inclusion.exclusion";
   public static final String EXTRACTOR_EXCLUSION_DEFAULT_VALUE = "";
 
+  public static final String EXTRACTOR_MODE_KEY = "extract.mode";
+  public static final String SOURCE_MODE_KEY = "source.mode";
+  public static final String EXTRACTOR_MODE_QUERY_VALUE = "query";
+  public static final String EXTRACTOR_MODE_SUBSCRIBE_VALUE = "subscribe";
+  public static final String EXTRACTOR_MODE_DEFAULT_VALUE = 
EXTRACTOR_MODE_SUBSCRIBE_VALUE;
+
   public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern";
   public static final String SOURCE_PATTERN_KEY = "source.pattern";
   public static final String EXTRACTOR_PATH_KEY = "extractor.path";
@@ -61,12 +67,6 @@ public class PipeExtractorConstant {
   public static final String EXTRACTOR_MODS_ENABLE_KEY = 
"extractor.mods.enable";
   public static final String SOURCE_MODS_ENABLE_KEY = "source.mods.enable";
   public static final boolean EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE = false;
-  public static final String 
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY =
-      "extractor.history.terminate-pipe-on-all-consumed";
-  public static final String SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY 
=
-      "source.history.terminate-pipe-on-all-consumed";
-  public static final boolean 
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE =
-      false;
 
   public static final String EXTRACTOR_REALTIME_ENABLE_KEY = 
"extractor.realtime.enable";
   public static final String SOURCE_REALTIME_ENABLE_KEY = 
"source.realtime.enable";

Reply via email to