This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8538308e7c9 Pipe: Modify
extractor.history.terminate-pipe-on-all-consumed -> extractor.mode. Make
sink.realtime-first true by default. (#12605)
8538308e7c9 is described below
commit 8538308e7c9785aa700a8c7934d6f523a6ac8be6
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed May 29 17:43:27 2024 +0800
Pipe: Modify extractor.history.terminate-pipe-on-all-consumed ->
extractor.mode. Make sink.realtime-first true by default. (#12605)
---
.../pipe/it/autocreate/IoTDBPipeAutoDropIT.java | 2 +-
.../pipe/it/autocreate/IoTDBPipeClusterIT.java | 3 ++-
.../pipe/it/autocreate/IoTDBPipeDataSinkIT.java | 4 ++--
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 27 +++++++++-------------
.../PipeHistoricalDataRegionTsFileExtractor.java | 16 ++++++-------
.../config/constant/PipeConnectorConstant.java | 2 +-
.../config/constant/PipeExtractorConstant.java | 12 +++++-----
7 files changed, 31 insertions(+), 35 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
index b223a245d0a..ecd0b2d5775 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
@@ -53,7 +53,7 @@ public class IoTDBPipeAutoDropIT extends
AbstractPipeDualAutoIT {
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();
-
extractorAttributes.put("extractor.history.terminate-pipe-on-all-consumed",
"true");
+ extractorAttributes.put("extractor.mode", "query");
connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index 7ebf56c6035..aeab3f74463 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -917,7 +917,8 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualAutoIT {
Arrays.asList(
// Test the correctness of insertRowsNode transmission
"insert into root.db.d1(time, s1) values (-122, 3)",
- "insert into root.db.d1(time, s1) values (-123, 3), (now(),
3)"))) {
+ "insert into root.db.d1(time, s1) values (-123, 3), (now(), 3)",
+ "flush"))) {
return;
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index d3bfe710aca..50a9b0d2ec5 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -43,7 +43,7 @@ import java.util.Map;
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeDataSinkIT extends AbstractPipeDualAutoIT {
@Test
- public void testThriftConnectorWithRealtimeFirst() throws Exception {
+ public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
@@ -70,7 +70,7 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualAutoIT {
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
- connectorAttributes.put("connector.realtime-first", "true");
+ connectorAttributes.put("connector.realtime-first", "false");
final TSStatus status =
client.createPipe(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 9f3b7fe66ee..098c5b68cee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
import org.apache.iotdb.commons.pipe.task.PipeTask;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -81,10 +82,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
-
public class PipeDataNodeTaskAgent extends PipeTaskAgent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeDataNodeTaskAgent.class);
@@ -309,8 +306,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
|| pipeTaskMap.entrySet().stream()
.filter(entry -> dataRegionIds.contains(entry.getKey()))
.allMatch(entry -> ((PipeDataNodeTask)
entry.getValue()).isCompleted());
- // If the "source.history.terminate-pipe-on-all-consumed" is false or
the pipe does
- // not include data transfer, we should not terminate the pipe.
final boolean includeDataAndNeedDrop =
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
pipeMeta.getStaticMeta().getExtractorParameters())
@@ -318,11 +313,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
&& pipeMeta
.getStaticMeta()
.getExtractorParameters()
- .getBooleanOrDefault(
+ .getStringOrDefault(
Arrays.asList(
- SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY,
-
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
-
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
+ PipeExtractorConstant.EXTRACTOR_MODE_KEY,
+ PipeExtractorConstant.SOURCE_MODE_KEY),
+ PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE)
+
.equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE);
final boolean isCompleted = isAllDataRegionCompleted &&
includeDataAndNeedDrop;
final Pair<Long, Double> remainingEventAndTime =
@@ -389,8 +385,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
|| pipeTaskMap.entrySet().stream()
.filter(entry -> dataRegionIds.contains(entry.getKey()))
.allMatch(entry -> ((PipeDataNodeTask)
entry.getValue()).isCompleted());
- // If the "source.history.terminate-pipe-on-all-consumed" is false or
the pipe does
- // not include data transfer, we should not terminate the pipe.
final boolean includeDataAndNeedDrop =
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
pipeMeta.getStaticMeta().getExtractorParameters())
@@ -398,11 +392,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
&& pipeMeta
.getStaticMeta()
.getExtractorParameters()
- .getBooleanOrDefault(
+ .getStringOrDefault(
Arrays.asList(
- SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY,
-
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
-
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
+ PipeExtractorConstant.EXTRACTOR_MODE_KEY,
+ PipeExtractorConstant.SOURCE_MODE_KEY),
+ PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE)
+
.equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE);
final boolean isCompleted = isAllDataRegionCompleted &&
includeDataAndNeedDrop;
final Pair<Long, Double> remainingEventAndTime =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 880d3f9f7f2..1c150dd5409 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
@@ -65,8 +66,6 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
@@ -75,7 +74,6 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
@@ -197,11 +195,13 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
.getRight());
shouldTerminatePipeOnAllHistoricalEventsConsumed =
- parameters.getBooleanOrDefault(
- Arrays.asList(
- SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY,
- EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
- EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(
+ PipeExtractorConstant.EXTRACTOR_MODE_KEY,
+ PipeExtractorConstant.SOURCE_MODE_KEY),
+ PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE)
+
.equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE);
} catch (final Exception e) {
// Compatible with the current validation framework
throw new PipeParameterNotValidException(e.getMessage());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 5e73c4d54d8..a7da7c3ef68 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -55,7 +55,7 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_REALTIME_FIRST_KEY =
"connector.realtime-first";
public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first";
- public static final boolean CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE = false;
+ public static final boolean CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE = true;
public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY =
"connector.batch.enable";
public static final String SINK_IOTDB_BATCH_MODE_ENABLE_KEY =
"sink.batch.enable";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index b5ee9c153dc..a01c24d3aec 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -32,6 +32,12 @@ public class PipeExtractorConstant {
public static final String SOURCE_EXCLUSION_KEY =
"source.inclusion.exclusion";
public static final String EXTRACTOR_EXCLUSION_DEFAULT_VALUE = "";
+ public static final String EXTRACTOR_MODE_KEY = "extractor.mode";
+ public static final String SOURCE_MODE_KEY = "source.mode";
+ public static final String EXTRACTOR_MODE_QUERY_VALUE = "query";
+ public static final String EXTRACTOR_MODE_SUBSCRIBE_VALUE = "subscribe";
+ public static final String EXTRACTOR_MODE_DEFAULT_VALUE =
EXTRACTOR_MODE_SUBSCRIBE_VALUE;
+
public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern";
public static final String SOURCE_PATTERN_KEY = "source.pattern";
public static final String EXTRACTOR_PATH_KEY = "extractor.path";
@@ -61,12 +67,6 @@ public class PipeExtractorConstant {
public static final String EXTRACTOR_MODS_ENABLE_KEY =
"extractor.mods.enable";
public static final String SOURCE_MODS_ENABLE_KEY = "source.mods.enable";
public static final boolean EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE = false;
- public static final String
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY =
- "extractor.history.terminate-pipe-on-all-consumed";
- public static final String SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY
=
- "source.history.terminate-pipe-on-all-consumed";
- public static final boolean
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE =
- false;
public static final String EXTRACTOR_REALTIME_ENABLE_KEY =
"extractor.realtime.enable";
public static final String SOURCE_REALTIME_ENABLE_KEY =
"source.realtime.enable";