This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new fe0b45a213b fixup! Pipe: split full-sync pipe into history and
realtime pipes (#16250) (#16316) (#16317)
fe0b45a213b is described below
commit fe0b45a213b2cc92dd58b7be59b6eea57e9305da
Author: VGalaxies <[email protected]>
AuthorDate: Tue Sep 2 15:29:17 2025 +0800
fixup! Pipe: split full-sync pipe into history and realtime pipes (#16250)
(#16316) (#16317)
---
.../config/executor/ClusterConfigTaskExecutor.java | 40 +++++++++++-----------
1 file changed, 20 insertions(+), 20 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index b1af7cee4d3..9dea7d92193 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1808,58 +1808,58 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
&& PipeDataNodeAgent.task().isFullSync(extractorPipeParameters)) {
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- // 1. Send request to create the historical data synchronization
pipeline
- final TCreatePipeReq historyReq =
+ // 1. Send request to create the real-time data synchronization
pipeline
+ final TCreatePipeReq realtimeReq =
new TCreatePipeReq()
- // Append suffix to the pipeline name for historical data
- .setPipeName(createPipeStatement.getPipeName() + "_history")
+ // Append suffix to the pipeline name for real-time data
+ .setPipeName(createPipeStatement.getPipeName() + "_realtime")
// NOTE: set if not exists always to true to handle partial
failure
.setIfNotExistsCondition(true)
- // Use extractor parameters for historical data
+ // Use extractor parameters for real-time data
.setExtractorAttributes(
extractorPipeParameters
.addOrReplaceEquivalentAttributesWithClone(
new PipeParameters(
ImmutableMap.of(
-
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
- Boolean.toString(true),
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
+ Boolean.toString(true),
+
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
Boolean.toString(false))))
.getAttribute())
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
- final TSStatus historyTsStatus =
configNodeClient.createPipe(historyReq);
+ final TSStatus realtimeTsStatus =
configNodeClient.createPipe(realtimeReq);
// If creation fails, immediately return with exception
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
historyTsStatus.getCode()) {
- future.setException(new IoTDBException(historyTsStatus));
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
realtimeTsStatus.getCode()) {
+ future.setException(new IoTDBException(realtimeTsStatus));
return future;
}
- // 2. Send request to create the real-time data synchronization
pipeline
- final TCreatePipeReq realtimeReq =
+ // 2. Send request to create the historical data synchronization
pipeline
+ final TCreatePipeReq historyReq =
new TCreatePipeReq()
- // Append suffix to the pipeline name for real-time data
- .setPipeName(createPipeStatement.getPipeName() + "_realtime")
+ // Append suffix to the pipeline name for historical data
+ .setPipeName(createPipeStatement.getPipeName() + "_history")
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
- // Use extractor parameters for real-time data
+ // Use extractor parameters for historical data
.setExtractorAttributes(
extractorPipeParameters
.addOrReplaceEquivalentAttributesWithClone(
new PipeParameters(
ImmutableMap.of(
-
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
- Boolean.toString(false),
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
+ Boolean.toString(false),
+
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
Boolean.toString(true))))
.getAttribute())
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
- final TSStatus realtimeTsStatus =
configNodeClient.createPipe(realtimeReq);
+ final TSStatus historyTsStatus =
configNodeClient.createPipe(historyReq);
// If creation fails, immediately return with exception
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
realtimeTsStatus.getCode()) {
- future.setException(new IoTDBException(realtimeTsStatus));
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
historyTsStatus.getCode()) {
+ future.setException(new IoTDBException(historyTsStatus));
return future;
}