This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch fix-historical-default
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/fix-historical-default by this
push:
new 5fdf5e693dc fix
5fdf5e693dc is described below
commit 5fdf5e693dc15a2b6426cd7e97ed88b647d03d91
Author: Caideyipi <[email protected]>
AuthorDate: Tue Mar 24 14:32:20 2026 +0800
fix
---
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 7 ++-
.../config/executor/ClusterConfigTaskExecutor.java | 52 +++++++++++-----------
2 files changed, 31 insertions(+), 28 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 0f8b9446d60..c2c1bbd6f96 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -584,7 +584,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
}
- public boolean isFullSync(final PipeParameters parameters) {
+ public boolean isFullSync(final PipeParameters parameters) throws
IllegalPathException {
if (isSnapshotMode(parameters)) {
return false;
}
@@ -598,7 +598,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
- return isHistoryEnable && isRealtimeEnable;
+ return isHistoryEnable
+ && isRealtimeEnable
+ &&
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
+ .getRight();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index d28edddb2f9..d0627351b27 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -181,6 +181,7 @@ import
org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -2205,10 +2206,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
new PipeParameters(createPipeStatement.getSourceAttributes());
final PipeParameters sinkPipeParameters =
new PipeParameters(createPipeStatement.getSinkAttributes());
- if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
- && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
- try (final ConfigNodeClient configNodeClient =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ try (final ConfigNodeClient configNodeClient =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
+ && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
// 1. Send request to create the real-time data synchronization
pipeline
final TCreatePipeReq realtimeReq =
new TCreatePipeReq()
@@ -2253,11 +2254,17 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
Boolean.toString(false),
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
Boolean.toString(true),
- // We force the historical pipe to
transfer data only
+ // We force the historical pipe to
transfer data (and maybe
+ // deletion) only
// Thus we can transfer schema only once
// And may drop the historical pipe on
successfully transferred
PipeSourceConstant.SOURCE_INCLUSION_KEY,
-
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
+ DataRegionListeningFilter
+
.parseInsertionDeletionListeningOptionPair(
+ sourcePipeParameters)
+ .getRight()
+ ? "data"
+ :
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
PipeSourceConstant.SOURCE_EXCLUSION_KEY,
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
.getAttribute())
@@ -2280,27 +2287,20 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
// 3. Set success status only if both pipelines are created
successfully
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
- } catch (final Exception e) {
- // Catch any other exceptions (e.g., network issues)
- future.setException(e);
- }
- return future;
- }
-
- try (final ConfigNodeClient configNodeClient =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final TCreatePipeReq req =
- new TCreatePipeReq()
- .setPipeName(pipeName)
-
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
-
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
-
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
- .setConnectorAttributes(createPipeStatement.getSinkAttributes());
- final TSStatus tsStatus = configNodeClient.createPipe(req);
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
- future.setException(new IoTDBException(tsStatus));
} else {
- future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ final TCreatePipeReq req =
+ new TCreatePipeReq()
+ .setPipeName(pipeName)
+
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
+
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
+
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
+
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
+ final TSStatus tsStatus = configNodeClient.createPipe(req);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode())
{
+ future.setException(new IoTDBException(tsStatus));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
}
} catch (final Exception e) {
future.setException(e);