This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 82cb55e3cb2 [To dev/1.3] Pipe: Fixed the bug that the schema may be 
sent twice when split-enabled & database may be null for non-first schema pipes 
(#16586) (#16589)
82cb55e3cb2 is described below

commit 82cb55e3cb25b43b2ef97fced4f8a95f509eeec4
Author: Caideyipi <[email protected]>
AuthorDate: Wed Oct 15 20:00:22 2025 +0800

    [To dev/1.3] Pipe: Fixed the bug that the schema may be sent twice when 
split-enabled & database may be null for non-first schema pipes (#16586) 
(#16589)
---
 .../source/schemaregion/IoTDBSchemaRegionSource.java  |  5 +++++
 .../config/executor/ClusterConfigTaskExecutor.java    | 19 +++++++++++++------
 .../queue/listening/AbstractPipeListeningQueue.java   | 10 ++++++----
 .../commons/pipe/source/IoTDBNonDataRegionSource.java |  8 ++++----
 4 files changed, 28 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
index 93e2c6bd3d7..808be515de7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java
@@ -92,6 +92,11 @@ public class IoTDBSchemaRegionSource extends 
IoTDBNonDataRegionSource {
         == 1) {
       SchemaRegionConsensusImpl.getInstance()
           .write(schemaRegionId, new PipeOperateSchemaQueueNode(new 
PlanNodeId(""), true));
+    } else if 
(!PipeDataNodeAgent.runtime().schemaListener(schemaRegionId).isOpened()) {
+      // This may be being concurrently opened, we should not continue to 
start or else the snapshot
+      // may not be listened
+      
PipeDataNodeAgent.runtime().decreaseAndGetSchemaListenerReferenceCount(schemaRegionId);
+      return;
     }
 
     super.start();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 75a7e3f158d..fdcda582e7a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1802,10 +1802,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
     // Syntactic sugar: if full-sync mode is detected (i.e. not snapshot mode, 
or both realtime
     // and history are true), the pipe is split into history-only and 
realtime–only modes.
-    final PipeParameters extractorPipeParameters =
+    final PipeParameters sourcePipeParameters =
         new PipeParameters(createPipeStatement.getExtractorAttributes());
     if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
-        && PipeDataNodeAgent.task().isFullSync(extractorPipeParameters)) {
+        && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
       try (final ConfigNodeClient configNodeClient =
           
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
         // 1. Send request to create the real-time data synchronization 
pipeline
@@ -1817,7 +1817,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                 .setIfNotExistsCondition(true)
                 // Use extractor parameters for real-time data
                 .setExtractorAttributes(
-                    extractorPipeParameters
+                    sourcePipeParameters
                         .addOrReplaceEquivalentAttributesWithClone(
                             new PipeParameters(
                                 ImmutableMap.of(
@@ -1842,16 +1842,23 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                 // Append suffix to the pipeline name for historical data
                 .setPipeName(createPipeStatement.getPipeName() + "_history")
                 
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
-                // Use extractor parameters for historical data
+                // Use source parameters for historical data
                 .setExtractorAttributes(
-                    extractorPipeParameters
+                    sourcePipeParameters
                         .addOrReplaceEquivalentAttributesWithClone(
                             new PipeParameters(
                                 ImmutableMap.of(
                                     
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
                                     Boolean.toString(false),
                                     
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
-                                    Boolean.toString(true))))
+                                    Boolean.toString(true),
+                                    // We force the historical pipe to 
transfer data only
+                                    // Thus we can transfer schema only once
+                                    // And may drop the historical pipe on 
successfully transferred
+                                    PipeSourceConstant.SOURCE_INCLUSION_KEY,
+                                    
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
+                                    PipeSourceConstant.SOURCE_EXCLUSION_KEY,
+                                    
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
                         .getAttribute())
                 
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
                 
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
index c5d83845f5c..cb57260e411 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
@@ -76,10 +76,12 @@ public abstract class AbstractPipeListeningQueue extends 
AbstractSerializableLis
     }
   }
 
-  public synchronized Pair<Long, List<PipeSnapshotEvent>> 
findAvailableSnapshots() {
-    if (queueTailIndex2SnapshotsCache.getLeft()
-        < queue.getTailIndex()
-            - 
PipeConfig.getInstance().getPipeListeningQueueTransferSnapshotThreshold()) {
+  public synchronized Pair<Long, List<PipeSnapshotEvent>> 
findAvailableSnapshots(
+      final boolean mayClear) {
+    if (mayClear
+        && queueTailIndex2SnapshotsCache.getLeft()
+            < queue.getTailIndex()
+                - 
PipeConfig.getInstance().getPipeListeningQueueTransferSnapshotThreshold()) {
       clearSnapshots();
     }
     return queueTailIndex2SnapshotsCache;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
index 1639c9e759f..837ce2faed2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
@@ -101,10 +101,10 @@ public abstract class IoTDBNonDataRegionSource extends 
IoTDBSource {
   private long getNextIndexAfterSnapshot() {
     long nextIndex;
     if (needTransferSnapshot()) {
-      nextIndex = findSnapshot();
+      nextIndex = findSnapshot(true);
       if (nextIndex == Long.MIN_VALUE) {
         triggerSnapshot();
-        nextIndex = findSnapshot();
+        nextIndex = findSnapshot(false);
         if (nextIndex == Long.MIN_VALUE) {
           throw new PipeException("Cannot get the newest snapshot after 
triggering one.");
         }
@@ -117,9 +117,9 @@ public abstract class IoTDBNonDataRegionSource extends 
IoTDBSource {
     return nextIndex;
   }
 
-  private long findSnapshot() {
+  private long findSnapshot(final boolean mayClear) {
     final Pair<Long, List<PipeSnapshotEvent>> queueTailIndex2Snapshots =
-        getListeningQueue().findAvailableSnapshots();
+        getListeningQueue().findAvailableSnapshots(mayClear);
     final long nextIndex =
         Objects.nonNull(queueTailIndex2Snapshots.getLeft())
                 && queueTailIndex2Snapshots.getLeft() != Long.MIN_VALUE

Reply via email to