This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 82cb55e3cb2 [To dev/1.3] Pipe: Fixed the bug that the schema may be
sent twice when split-enabled & database may be null for non-first schema pipes
(#16586) (#16589)
82cb55e3cb2 is described below
commit 82cb55e3cb25b43b2ef97fced4f8a95f509eeec4
Author: Caideyipi <[email protected]>
AuthorDate: Wed Oct 15 20:00:22 2025 +0800
[To dev/1.3] Pipe: Fixed the bug that the schema may be sent twice when
split-enabled & database may be null for non-first schema pipes (#16586)
(#16589)
---
.../source/schemaregion/IoTDBSchemaRegionSource.java | 5 +++++
.../config/executor/ClusterConfigTaskExecutor.java | 19 +++++++++++++------
.../queue/listening/AbstractPipeListeningQueue.java | 10 ++++++----
.../commons/pipe/source/IoTDBNonDataRegionSource.java | 8 ++++----
4 files changed, 28 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
index 93e2c6bd3d7..808be515de7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
@@ -92,6 +92,11 @@ public class IoTDBSchemaRegionSource extends
IoTDBNonDataRegionSource {
== 1) {
SchemaRegionConsensusImpl.getInstance()
.write(schemaRegionId, new PipeOperateSchemaQueueNode(new
PlanNodeId(""), true));
+ } else if
(!PipeDataNodeAgent.runtime().schemaListener(schemaRegionId).isOpened()) {
+ // This may be being concurrently opened, we should not continue to
start or else the snapshot
+ // may not be listened
+
PipeDataNodeAgent.runtime().decreaseAndGetSchemaListenerReferenceCount(schemaRegionId);
+ return;
}
super.start();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 75a7e3f158d..fdcda582e7a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1802,10 +1802,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
// Syntactic sugar: if full-sync mode is detected (i.e. not snapshot mode,
or both realtime
// and history are true), the pipe is split into history-only and
realtime–only modes.
- final PipeParameters extractorPipeParameters =
+ final PipeParameters sourcePipeParameters =
new PipeParameters(createPipeStatement.getExtractorAttributes());
if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
- && PipeDataNodeAgent.task().isFullSync(extractorPipeParameters)) {
+ && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// 1. Send request to create the real-time data synchronization
pipeline
@@ -1817,7 +1817,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
.setIfNotExistsCondition(true)
// Use extractor parameters for real-time data
.setExtractorAttributes(
- extractorPipeParameters
+ sourcePipeParameters
.addOrReplaceEquivalentAttributesWithClone(
new PipeParameters(
ImmutableMap.of(
@@ -1842,16 +1842,23 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
// Append suffix to the pipeline name for historical data
.setPipeName(createPipeStatement.getPipeName() + "_history")
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
- // Use extractor parameters for historical data
+ // Use source parameters for historical data
.setExtractorAttributes(
- extractorPipeParameters
+ sourcePipeParameters
.addOrReplaceEquivalentAttributesWithClone(
new PipeParameters(
ImmutableMap.of(
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
Boolean.toString(false),
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
- Boolean.toString(true))))
+ Boolean.toString(true),
+ // We force the historical pipe to
transfer data only
+ // Thus we can transfer schema only once
+ // And may drop the historical pipe on
successfully transferred
+ PipeSourceConstant.SOURCE_INCLUSION_KEY,
+
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
+ PipeSourceConstant.SOURCE_EXCLUSION_KEY,
+
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
.getAttribute())
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
index c5d83845f5c..cb57260e411 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
@@ -76,10 +76,12 @@ public abstract class AbstractPipeListeningQueue extends
AbstractSerializableLis
}
}
- public synchronized Pair<Long, List<PipeSnapshotEvent>>
findAvailableSnapshots() {
- if (queueTailIndex2SnapshotsCache.getLeft()
- < queue.getTailIndex()
- -
PipeConfig.getInstance().getPipeListeningQueueTransferSnapshotThreshold()) {
+ public synchronized Pair<Long, List<PipeSnapshotEvent>>
findAvailableSnapshots(
+ final boolean mayClear) {
+ if (mayClear
+ && queueTailIndex2SnapshotsCache.getLeft()
+ < queue.getTailIndex()
+ -
PipeConfig.getInstance().getPipeListeningQueueTransferSnapshotThreshold()) {
clearSnapshots();
}
return queueTailIndex2SnapshotsCache;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
index 1639c9e759f..837ce2faed2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
@@ -101,10 +101,10 @@ public abstract class IoTDBNonDataRegionSource extends
IoTDBSource {
private long getNextIndexAfterSnapshot() {
long nextIndex;
if (needTransferSnapshot()) {
- nextIndex = findSnapshot();
+ nextIndex = findSnapshot(true);
if (nextIndex == Long.MIN_VALUE) {
triggerSnapshot();
- nextIndex = findSnapshot();
+ nextIndex = findSnapshot(false);
if (nextIndex == Long.MIN_VALUE) {
throw new PipeException("Cannot get the newest snapshot after
triggering one.");
}
@@ -117,9 +117,9 @@ public abstract class IoTDBNonDataRegionSource extends
IoTDBSource {
return nextIndex;
}
- private long findSnapshot() {
+ private long findSnapshot(final boolean mayClear) {
final Pair<Long, List<PipeSnapshotEvent>> queueTailIndex2Snapshots =
- getListeningQueue().findAvailableSnapshots();
+ getListeningQueue().findAvailableSnapshots(mayClear);
final long nextIndex =
Objects.nonNull(queueTailIndex2Snapshots.getLeft())
&& queueTailIndex2Snapshots.getLeft() != Long.MIN_VALUE