This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-param-modification in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit da3bfa1542ba15cae0457202813215e8d0abe859 Author: Steve Yurong Su <[email protected]> AuthorDate: Tue May 28 18:05:11 2024 +0800 extractor.history.terminate-pipe-on-all-consumed -> extract.mode --- .../pipe/it/autocreate/IoTDBPipeAutoDropIT.java | 2 +- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 27 +++++++++------------- .../PipeHistoricalDataRegionTsFileExtractor.java | 16 ++++++------- .../config/constant/PipeExtractorConstant.java | 12 +++++----- 4 files changed, 26 insertions(+), 31 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java index b223a245d0a..ecd0b2d5775 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java @@ -53,7 +53,7 @@ public class IoTDBPipeAutoDropIT extends AbstractPipeDualAutoIT { final Map<String, String> processorAttributes = new HashMap<>(); final Map<String, String> connectorAttributes = new HashMap<>(); - extractorAttributes.put("extractor.history.terminate-pipe-on-all-consumed", "true"); + extractorAttributes.put("extractor.mode", "query"); connectorAttributes.put("connector", "iotdb-thrift-connector"); connectorAttributes.put("connector.batch.enable", "false"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 65c7699f398..112d91ff953 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.commons.pipe.task.PipeTask; import org.apache.iotdb.commons.pipe.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta; @@ -80,10 +81,6 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE; -import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY; - public class PipeDataNodeTaskAgent extends PipeTaskAgent { private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataNodeTaskAgent.class); @@ -305,8 +302,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { || pipeTaskMap.entrySet().stream() .filter(entry -> dataRegionIds.contains(entry.getKey())) .allMatch(entry -> ((PipeDataNodeTask) entry.getValue()).isCompleted()); - // If the "source.history.terminate-pipe-on-all-consumed" is false or the pipe does - // not include data transfer, we should not terminate the pipe. final boolean includeDataAndNeedDrop = DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair( pipeMeta.getStaticMeta().getExtractorParameters()) @@ -314,11 +309,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { && pipeMeta .getStaticMeta() .getExtractorParameters() - .getBooleanOrDefault( + .getStringOrDefault( Arrays.asList( - SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY, - EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY), - EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE); + PipeExtractorConstant.EXTRACTOR_MODE_KEY, + PipeExtractorConstant.SOURCE_MODE_KEY), + PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE) + .equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE); pipeCompletedList.add(isAllDataRegionCompleted && includeDataAndNeedDrop); @@ -371,8 +367,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { || pipeTaskMap.entrySet().stream() .filter(entry -> dataRegionIds.contains(entry.getKey())) .allMatch(entry -> ((PipeDataNodeTask) entry.getValue()).isCompleted()); - // If the "source.history.terminate-pipe-on-all-consumed" is false or the pipe does - // not include data transfer, we should not terminate the pipe. final boolean includeDataAndNeedDrop = DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair( pipeMeta.getStaticMeta().getExtractorParameters()) @@ -380,11 +374,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { && pipeMeta .getStaticMeta() .getExtractorParameters() - .getBooleanOrDefault( + .getStringOrDefault( Arrays.asList( - SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY, - EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY), - EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE); + PipeExtractorConstant.EXTRACTOR_MODE_KEY, + PipeExtractorConstant.SOURCE_MODE_KEY), + PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE) + .equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE); pipeCompletedList.add(isAllDataRegionCompleted && includeDataAndNeedDrop); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index 880d3f9f7f2..1c150dd5409 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; import org.apache.iotdb.commons.pipe.pattern.PipePattern; @@ -65,8 +66,6 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE; -import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY; @@ -75,7 +74,6 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY; @@ -197,11 +195,13 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa .getRight()); shouldTerminatePipeOnAllHistoricalEventsConsumed = - parameters.getBooleanOrDefault( - Arrays.asList( - SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY, - EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY), - EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE); + parameters + .getStringOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTRACTOR_MODE_KEY, + PipeExtractorConstant.SOURCE_MODE_KEY), + PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE) + .equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE); } catch (final Exception e) { // Compatible with the current validation framework throw new PipeParameterNotValidException(e.getMessage()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java index b5ee9c153dc..c7d9aca95c5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java @@ -32,6 +32,12 @@ public class PipeExtractorConstant { public static final String SOURCE_EXCLUSION_KEY = "source.inclusion.exclusion"; public static final String EXTRACTOR_EXCLUSION_DEFAULT_VALUE = ""; + public static final String EXTRACTOR_MODE_KEY = "extract.mode"; + public static final String SOURCE_MODE_KEY = "source.mode"; + public static final String EXTRACTOR_MODE_QUERY_VALUE = "query"; + public static final String EXTRACTOR_MODE_SUBSCRIBE_VALUE = "subscribe"; + public static final String EXTRACTOR_MODE_DEFAULT_VALUE = EXTRACTOR_MODE_SUBSCRIBE_VALUE; + public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern"; public static final String SOURCE_PATTERN_KEY = "source.pattern"; public static final String EXTRACTOR_PATH_KEY = "extractor.path"; @@ -61,12 +67,6 @@ public class PipeExtractorConstant { public static final String EXTRACTOR_MODS_ENABLE_KEY = "extractor.mods.enable"; public static final String SOURCE_MODS_ENABLE_KEY = "source.mods.enable"; public static final boolean EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE = false; - public static final String EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY = - "extractor.history.terminate-pipe-on-all-consumed"; - public static final String SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY = - "source.history.terminate-pipe-on-all-consumed"; - public static final boolean EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE = - false; public static final String EXTRACTOR_REALTIME_ENABLE_KEY = "extractor.realtime.enable"; public static final String SOURCE_REALTIME_ENABLE_KEY = "source.realtime.enable";
