This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ef35e4537e Cleaned multiple potential problems in pipe module (#17396)
6ef35e4537e is described below

commit 6ef35e4537e623e17da2d486082edd5f1d44095d
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 1 09:34:25 2026 +0800

    Cleaned multiple potential problems in pipe module (#17396)
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * gras-shop
    
    * fix
    
    * spls
    
    * fix
    
    * pipe-dn
    
    * logger-bug
    
    * fix
---
 .../runtime/heartbeat/PipeHeartbeat.java           |  2 +-
 .../pipe/source/IoTDBConfigRegionSource.java       |  6 ++--
 .../db/pipe/agent/runtime/PipeAgentLauncher.java   |  2 +-
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  2 +-
 .../agent/task/connection/PipeEventCollector.java  |  2 +-
 .../sink/PipeRealtimePriorityBlockingQueue.java    | 11 ++----
 .../common/deletion/PipeDeleteDataNodeEvent.java   |  3 +-
 .../pipe/metric/overview/PipeResourceMetrics.java  |  2 +-
 .../receiver/PipeDataNodeReceiverMetrics.java      |  4 +--
 .../processor/aggregate/AggregateProcessor.java    |  4 +--
 .../iotconsensusv2/IoTConsensusV2Receiver.java     | 25 +++++++++----
 .../IoTConsensusV2ReceiverAgent.java               |  2 +-
 .../resource/memory/PipeDynamicMemoryBlock.java    |  4 +--
 .../pipe/resource/memory/PipeMemoryWeightUtil.java |  4 ++-
 .../client/IoTDBDataNodeAsyncClientManager.java    |  4 +--
 .../protocol/airgap/IoTDBDataRegionAirGapSink.java |  2 +-
 .../websocket/WebSocketConnectorServer.java        | 23 ++++++++----
 .../util/builder/PipeTableModelTsFileBuilder.java  |  9 ++---
 .../util/builder/PipeTreeModelTsFileBuilder.java   |  9 ++---
 .../pipe/sink/util/builder/PipeTsFileBuilder.java  |  4 +--
 .../source/dataregion/IoTDBDataRegionSource.java   |  2 +-
 .../PipeRealtimeDataRegionHybridSource.java        | 34 ++----------------
 .../realtime/PipeRealtimeDataRegionLogSource.java  | 42 ++++------------------
 .../realtime/PipeRealtimeDataRegionSource.java     | 35 ++----------------
 .../PipeRealtimeDataRegionTsFileSource.java        | 18 ++--------
 .../TsFileDeduplicationBlockingPendingQueue.java   |  2 +-
 .../task/connection/BlockingPendingQueue.java      | 21 +----------
 .../pipe/datastructure/pattern/TreePattern.java    | 11 ++++--
 .../pipe/sink/client/IoTDBSyncClientManager.java   |  2 +-
 29 files changed, 99 insertions(+), 192 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
index 13a3c4b83d6..e7e9d2cd97d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -55,7 +55,7 @@ public class PipeHeartbeat {
       // the final results and namely these dataNodes are omitted in 
calculation.
       remainingEventCountMap.put(
           pipeMeta.getStaticMeta(),
-          Objects.nonNull(pipeCompletedListFromAgent)
+          Objects.nonNull(pipeRemainingEventCountListFromAgent)
               ? pipeRemainingEventCountListFromAgent.get(i)
               : 0L);
       remainingTimeMap.put(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
index 07ebb638313..762e8c154e5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
@@ -153,8 +153,10 @@ public class IoTDBConfigRegionSource extends 
IoTDBNonDataRegionSource {
   @Override
   public synchronized EnrichedEvent supply() throws Exception {
     final EnrichedEvent event = super.supply();
-    PipeEventCommitManager.getInstance()
-        .enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
+    if (Objects.nonNull(event)) {
+      PipeEventCommitManager.getInstance()
+          .enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
+    }
     return event;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 1e1d3388f75..01289109371 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -78,7 +78,7 @@ class PipeAgentLauncher {
         curList.add(uninstalledOrConflictedPipePluginMetaList.get(index + 
offset));
         offset++;
       }
-      index += (offset + 1);
+      index += offset;
       fetchAndSavePipePluginJars(curList);
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index ea12513d647..8e6818dff32 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -706,7 +706,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
               MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
               needMemory,
               freeMemorySizeInBytes,
-              freeMemorySizeInBytes,
+              reservedMemorySizeInBytes,
               
PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes());
       LOGGER.warn(message);
       throw new PipeException(message);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index ec2a4850693..a22848ee3ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -238,7 +238,7 @@ public class PipeEventCollector implements EventCollector {
       ((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
     }
 
-    pendingQueue.directOffer(event);
+    pendingQueue.offer(event);
     collectInvocationCount.incrementAndGet();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
index 2baebeedc18..6d227ac31fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
@@ -72,7 +72,7 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   }
 
   @Override
-  public boolean directOffer(final Event event) {
+  public boolean offer(final Event event) {
     checkBeforeOffer(event);
 
     if (event instanceof TsFileInsertionEvent) {
@@ -85,18 +85,13 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
       ((EnrichedEvent) 
event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
       return false;
     } else {
-      return super.directOffer(event);
+      return super.offer(event);
     }
   }
 
-  @Override
-  public boolean waitedOffer(final Event event) {
-    return directOffer(event);
-  }
-
   @Override
   public boolean put(final Event event) {
-    directOffer(event);
+    offer(event);
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
index d0f720c2743..f49f2b8bc83 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
@@ -193,7 +192,7 @@ public class PipeDeleteDataNodeEvent extends EnrichedEvent 
implements Serializab
   @Override
   public void deserializeFromByteBuffer(final ByteBuffer buffer) {
     isGeneratedByPipe = ReadWriteIOUtils.readBool(buffer);
-    deleteDataNode = (DeleteDataNode) PlanNodeType.deserialize(buffer);
+    deleteDataNode = (AbstractDeleteDataNode) PlanNodeType.deserialize(buffer);
     progressIndex = deleteDataNode.getProgressIndex();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
index 4156aea4bc5..eeb840ce0c5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
@@ -138,7 +138,7 @@ public class PipeResourceMetrics implements IMetricSet {
     // phantom reference count
     metricService.remove(MetricType.AUTO_GAUGE, 
Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString());
 
-    metricService.remove(MetricType.RATE, 
Metric.PIPE_TSFILE_SEND_DISK_IO.toString());
+    metricService.remove(MetricType.COUNTER, 
Metric.PIPE_TSFILE_SEND_DISK_IO.toString());
   }
 
   public void recordDiskIO(final long bytes) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
index 1e877a7b2a0..82defa546bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
@@ -338,14 +338,14 @@ public class PipeDataNodeReceiverMetrics implements 
IMetricSet {
         Tag.NAME.toString(),
         RECEIVER,
         Tag.TYPE.toString(),
-        "handshakeDatanodeV1");
+        "handshakeDataNodeV1");
     metricService.remove(
         MetricType.TIMER,
         Metric.PIPE_DATANODE_RECEIVER.toString(),
         Tag.NAME.toString(),
         RECEIVER,
         Tag.TYPE.toString(),
-        "handshakeDatanodeV2");
+        "handshakeDataNodeV2");
     metricService.remove(
         MetricType.TIMER,
         Metric.PIPE_DATANODE_RECEIVER.toString(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index 10a72a0d9c5..43b09940780 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -744,7 +744,7 @@ public class AggregateProcessor implements PipeProcessor {
               throw new UnsupportedOperationException(
                   String.format(
                       "The output tablet does not support column type %s",
-                      valueColumnTypes[rowIndex]));
+                      valueColumnTypes[columnIndex]));
           }
         } else {
           bitMaps[columnIndex].mark(rowIndex);
@@ -758,7 +758,7 @@ public class AggregateProcessor implements PipeProcessor {
     int filteredCount = 0;
     for (int i = 0; i < columnNameStringList.length; ++i) {
       if (!bitMaps[i].isAllMarked()) {
-        originColumnIndex2FilteredColumnIndexMapperList[i] = ++filteredCount;
+        originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++;
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
index 77eb1d0a6fc..adf80654445 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
@@ -555,6 +555,15 @@ public class IoTConsensusV2Receiver {
         }
       }
 
+      if (req.getFileNames().size() < 2) {
+        return new TIoTConsensusV2TransferResp(
+            RpcUtils.getStatus(
+                TSStatusCode.IOT_CONSENSUS_V2_TRANSFER_FILE_ERROR,
+                String.format(
+                    "Failed to seal file %s, because the number of files is 
less than 2.",
+                    req.getFileNames())));
+      }
+
       // Sync here is necessary to ensure that the data is written to the 
disk. Or data region may
       // load the file before the data is written to the disk and cause 
unexpected behavior after
       // system restart. (e.g., empty file in data region's data directory)
@@ -952,6 +961,7 @@ public class IoTConsensusV2Receiver {
 
   private class IoTConsensusV2TsFileWriterPool {
     private final Lock lock = new ReentrantLock();
+    private final Condition condition = lock.newCondition();
     private final List<IoTConsensusV2TsFileWriter> 
iotConsensusV2TsFileWriterPool =
         new ArrayList<>();
     private final ConsensusPipeName consensusPipeName;
@@ -1014,15 +1024,18 @@ public class IoTConsensusV2Receiver {
           while (!tsFileWriter.isPresent()) {
             tsFileWriter =
                 iotConsensusV2TsFileWriterPool.stream().filter(item -> 
!item.isUsed()).findFirst();
-            Thread.sleep(RETRY_WAIT_TIME);
+            condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS);
           }
           tsFileWriter.get().setUsed(true);
           tsFileWriter.get().setCommitIdOfCorrespondingHolderEvent(commitId);
-        } catch (InterruptedException e) {
+        } catch (final InterruptedException e) {
           Thread.currentThread().interrupt();
-          LOGGER.warn(
-              "IoTConsensusV2{}: receiver thread get interrupted when waiting 
for borrowing tsFileWriter.",
-              consensusPipeName);
+          final String errorStr =
+              String.format(
+                  "IoTConsensusV2%s: receiver thread get interrupted when 
waiting for borrowing tsFileWriter.",
+                  consensusPipeName);
+          LOGGER.warn(errorStr);
+          throw new RuntimeException(errorStr);
         } finally {
           lock.unlock();
         }
@@ -1057,7 +1070,7 @@ public class IoTConsensusV2Receiver {
                 && tsFileWriter.isUsed()) {
               try {
                 Thread.sleep(RETRY_WAIT_TIME);
-              } catch (InterruptedException e) {
+              } catch (final InterruptedException e) {
                 Thread.currentThread().interrupt();
                 LOGGER.warn(
                     "IoTConsensusV2-PipeName-{}: receiver thread get 
interrupted when exiting.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
index 481d8dd7370..be2659c1cf4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2ReceiverAgent.java
@@ -243,7 +243,7 @@ public class IoTConsensusV2ReceiverAgent implements 
ConsensusPipeReceiver {
                 ConsensusPipeName consensusPipeName = receiverEntry.getKey();
                 AtomicReference<IoTConsensusV2Receiver> receiverReference =
                     receiverEntry.getValue();
-                if (receiverReference != null) {
+                if (receiverReference != null && receiverReference.get() != 
null) {
                   receiverReference.get().handleExit();
                   receiverReference.set(null);
                 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
index 4e33b871828..3dbac912db3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
@@ -44,7 +44,7 @@ public class PipeDynamicMemoryBlock {
 
   PipeDynamicMemoryBlock(
       final @NotNull PipeModelFixedMemoryBlock fixedMemoryBlock, final long 
memoryUsageInBytes) {
-    this.memoryUsageInBytes = Math.min(memoryUsageInBytes, 0);
+    this.memoryUsageInBytes = Math.max(memoryUsageInBytes, 0);
     this.fixedMemoryBlock = fixedMemoryBlock;
   }
 
@@ -116,7 +116,7 @@ public class PipeDynamicMemoryBlock {
       if (Double.isNaN(historyMemoryEfficiency)
           || Double.isInfinite(historyMemoryEfficiency)
           || historyMemoryEfficiency < 0.0) {
-        currentMemoryEfficiency = 0.0;
+        historyMemoryEfficiency = 0.0;
       }
 
       this.historyMemoryEfficiency = Math.min(historyMemoryEfficiency, 1.0);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index 0ba204fcbd2..458434d8494 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -220,7 +220,9 @@ public class PipeMemoryWeightUtil {
       totalSizeInBytes +=
           NUM_BYTES_ARRAY_HEADER + (long) NUM_BYTES_OBJECT_REF * 
measurementSchemas.size();
       for (IMeasurementSchema measurementSchema : measurementSchemas) {
-        InsertNodeMemoryEstimator.sizeOfMeasurementSchema((MeasurementSchema) 
measurementSchema);
+        totalSizeInBytes +=
+            InsertNodeMemoryEstimator.sizeOfMeasurementSchema(
+                (MeasurementSchema) measurementSchema);
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 73e2213eea6..88a79146295 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -477,7 +477,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
       while (true) {
         final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random() 
* clientSize));
-        if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+        if (isUnhealthy(targetNodeUrl) && n < clientSize) {
           n++;
           continue;
         }
@@ -498,7 +498,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       long n = 0;
       while (true) {
         for (final TEndPoint targetNodeUrl : endPointList) {
-          if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+          if (isUnhealthy(targetNodeUrl) && n < clientSize) {
             n++;
             continue;
           }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 622f4e4f0cd..7f904324bbb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -172,7 +172,7 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
       throw new PipeConnectionException(
           String.format(
               "Network error when transfer tsfile event %s, because %s.",
-              ((PipeDeleteDataNodeEvent) event).coreReportMessage(), 
e.getMessage()),
+              ((EnrichedEvent) event).coreReportMessage(), e.getMessage()),
           e);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
index 240de912f6f..491d40a1e45 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
@@ -35,8 +35,10 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -90,14 +92,18 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
       final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
           eventsWaitingForTransfer.remove(pipeName);
       while (!eventTransferQueue.isEmpty()) {
-        eventTransferQueue.forEach(
+        final List<EventWaitingForTransfer> eventWrappers;
+        synchronized (eventTransferQueue) {
+          eventWrappers = new ArrayList<>(eventTransferQueue);
+          eventTransferQueue.clear();
+        }
+        eventWrappers.forEach(
             (eventWrapper) -> {
               if (eventWrapper.event instanceof EnrichedEvent) {
                 ((EnrichedEvent) eventWrapper.event)
                     
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
               }
             });
-        eventTransferQueue.clear();
         synchronized (eventTransferQueue) {
           eventTransferQueue.notifyAll();
         }
@@ -270,8 +276,10 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
 
     LOGGER.warn(
         "The tablet of commitId: {} can't be parsed by client, it will be 
retried later.", eventId);
-    eventTransferQueue.put(
-        new EventWaitingForTransfer(eventId, eventWrapper.connector, 
eventWrapper.event));
+    synchronized (eventTransferQueue) {
+      eventTransferQueue.put(
+          new EventWaitingForTransfer(eventId, eventWrapper.connector, 
eventWrapper.event));
+    }
   }
 
   @Override
@@ -321,7 +329,9 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
       }
     }
 
-    queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), 
connector, event));
+    synchronized (queue) {
+      queue.put(new 
EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event));
+    }
   }
 
   private class TransferThread extends Thread {
@@ -347,7 +357,8 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
           }
 
           try {
-            final EventWaitingForTransfer queueElement = queue.take();
+            EventWaitingForTransfer queueElement;
+            queueElement = queue.take();
             synchronized (queue) {
               queue.notifyAll();
             }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
index 52c18a10da9..c105a86ff77 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
@@ -160,18 +160,19 @@ public class PipeTableModelTsFileBuilder extends 
PipeTsFileBuilder {
             e.getMessage(),
             e);
 
+        final File file = fileWriter.getIOWriter().getFile();
         try {
           fileWriter.close();
         } catch (final Exception closeException) {
           LOGGER.warn(
               "Batch id = {}: Failed to close the tsfile {} after failed to 
write tablets into, because {}",
               currentBatchId.get(),
-              fileWriter.getIOWriter().getFile().getPath(),
+              file.getPath(),
               closeException.getMessage(),
               closeException);
         } finally {
           // Add current writing file to the list and delete the file
-          sealedFiles.add(new Pair<>(dataBase, 
fileWriter.getIOWriter().getFile()));
+          sealedFiles.add(new Pair<>(dataBase, file));
         }
 
         for (final Pair<String, File> sealedFile : sealedFiles) {
@@ -181,7 +182,7 @@ public class PipeTableModelTsFileBuilder extends 
PipeTsFileBuilder {
               currentBatchId.get(),
               deleteSuccess ? "Successfully" : "Failed to",
               sealedFile.right.getPath(),
-              fileWriter.getIOWriter().getFile().getPath(),
+              file.getPath(),
               deleteSuccess ? "" : "Maybe the tsfile needs to be deleted 
manually.");
         }
         sealedFiles.clear();
@@ -191,8 +192,8 @@ public class PipeTableModelTsFileBuilder extends 
PipeTsFileBuilder {
         throw e;
       }
 
-      fileWriter.close();
       final File sealedFile = fileWriter.getIOWriter().getFile();
+      fileWriter.close();
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
             "Batch id = {}: Seal tsfile {} successfully.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
index d46180bd793..89827cd8421 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
@@ -155,18 +155,19 @@ public class PipeTreeModelTsFileBuilder extends 
PipeTsFileBuilder {
             e.getMessage(),
             e);
 
+        final File file = fileWriter.getIOWriter().getFile();
         try {
           fileWriter.close();
         } catch (final Exception closeException) {
           LOGGER.warn(
               "Batch id = {}: Failed to close the tsfile {} after failed to 
write tablets into, because {}",
               currentBatchId.get(),
-              fileWriter.getIOWriter().getFile().getPath(),
+              file.getPath(),
               closeException.getMessage(),
               closeException);
         } finally {
           // Add current writing file to the list and delete the file
-          sealedFiles.add(new Pair<>(null, 
fileWriter.getIOWriter().getFile()));
+          sealedFiles.add(new Pair<>(null, file));
         }
 
         for (final Pair<String, File> sealedFile : sealedFiles) {
@@ -176,7 +177,7 @@ public class PipeTreeModelTsFileBuilder extends 
PipeTsFileBuilder {
               currentBatchId.get(),
               deleteSuccess ? "Successfully" : "Failed to",
               sealedFile.right.getPath(),
-              fileWriter.getIOWriter().getFile().getPath(),
+              file.getPath(),
               deleteSuccess ? "" : "Maybe the tsfile needs to be deleted 
manually.");
         }
         sealedFiles.clear();
@@ -186,8 +187,8 @@ public class PipeTreeModelTsFileBuilder extends 
PipeTsFileBuilder {
         throw e;
       }
 
-      fileWriter.close();
       final File sealedFile = fileWriter.getIOWriter().getFile();
+      fileWriter.close();
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
             "Batch id = {}: Seal tsfile {} successfully.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
index 81557963822..217adeed43e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
@@ -106,9 +106,7 @@ public abstract class PipeTsFileBuilder {
         return baseDir;
       }
       throw new PipeException(
-          String.format(
-              "Failed to create batch file dir %s. (Batch id = %s)",
-              baseDir.getPath(), currentBatchId.get()));
+          String.format("Failed to create batch file dir. (Batch id = %s)", 
currentBatchId.get()));
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index 32e5c300f7d..1b06aab8a07 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -532,7 +532,7 @@ public class IoTDBDataRegionSource extends IoTDBSource {
         EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)) {
       watermarkIntervalInMs =
           parameters.getLongOrDefault(
-              Arrays.asList(_EXTRACTOR_WATERMARK_INTERVAL_KEY, 
_SOURCE_WATERMARK_INTERVAL_KEY),
+              Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY, 
SOURCE_WATERMARK_INTERVAL_KEY),
               EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE);
     } else if (parameters.hasAnyAttributes(
         _EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 0721683f4d2..c9e3f35288a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -58,7 +58,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
     } else if (eventToExtract instanceof PipeHeartbeatEvent) {
       extractHeartbeat(event);
     } else if (eventToExtract instanceof PipeDeleteDataNodeEvent) {
-      extractDirectly(event);
+      pendingQueue.offer(event);
     } else {
       throw new UnsupportedOperationException(
           String.format(
@@ -116,21 +116,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
         if (state == TsFileEpoch.State.USING_BOTH) {
           event.skipReportOnCommit();
         }
-        if (!pendingQueue.waitedOffer(event)) {
-          // This would not happen, but just in case.
-          // pendingQueue is unbounded, so it should never reach capacity.
-          final String errorMessage =
-              String.format(
-                  "extractTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor %s "
-                      + "has reached capacity, discard tablet event %s, 
current state %s",
-                  this, event, event.getTsFileEpoch().getState(this));
-          LOGGER.error(errorMessage);
-          PipeDataNodeAgent.runtime()
-              .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-          // Ignore the tablet event.
-          
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(),
 false);
-        }
+        pendingQueue.offer(event);
         break;
       default:
         throw new UnsupportedOperationException(
@@ -176,21 +162,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
       case EMPTY:
       case USING_TSFILE:
       case USING_BOTH:
-        if (!pendingQueue.waitedOffer(event)) {
-          // This would not happen, but just in case.
-          // pendingQueue is unbounded, so it should never reach capacity.
-          final String errorMessage =
-              String.format(
-                  "extractTsFileInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor %s "
-                      + "has reached capacity, discard TsFile event %s, 
current state %s",
-                  this, event, event.getTsFileEpoch().getState(this));
-          LOGGER.error(errorMessage);
-          PipeDataNodeAgent.runtime()
-              .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-          // Ignore the tsfile event.
-          
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(),
 false);
-        }
+        pendingQueue.offer(event);
         break;
       default:
         throw new UnsupportedOperationException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
index 35a2c7190c8..579310b3f15 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
@@ -40,7 +40,7 @@ public class PipeRealtimeDataRegionLogSource extends 
PipeRealtimeDataRegionSourc
       LoggerFactory.getLogger(PipeRealtimeDataRegionLogSource.class);
 
   @Override
-  protected void doExtract(PipeRealtimeEvent event) {
+  protected void doExtract(final PipeRealtimeEvent event) {
     final Event eventToExtract = event.getEvent();
 
     if (eventToExtract instanceof TabletInsertionEvent) {
@@ -51,7 +51,7 @@ public class PipeRealtimeDataRegionLogSource extends 
PipeRealtimeDataRegionSourc
     } else if (eventToExtract instanceof PipeHeartbeatEvent) {
       extractHeartbeat(event);
     } else if (eventToExtract instanceof PipeDeleteDataNodeEvent) {
-      extractDirectly(event);
+      pendingQueue.offer(event);
     } else {
       throw new UnsupportedOperationException(
           String.format(
@@ -60,27 +60,12 @@ public class PipeRealtimeDataRegionLogSource extends 
PipeRealtimeDataRegionSourc
     }
   }
 
-  private void extractTabletInsertion(PipeRealtimeEvent event) {
+  private void extractTabletInsertion(final PipeRealtimeEvent event) {
     event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TABLET);
-
-    if (!pendingQueue.waitedOffer(event)) {
-      // this would not happen, but just in case.
-      // pendingQueue is unbounded, so it should never reach capacity.
-      final String errorMessage =
-          String.format(
-              "extract: pending queue of PipeRealtimeDataRegionLogExtractor %s 
"
-                  + "has reached capacity, discard tablet event %s, current 
state %s",
-              this, event, event.getTsFileEpoch().getState(this));
-      LOGGER.error(errorMessage);
-      PipeDataNodeAgent.runtime()
-          .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-      // ignore this event.
-      
event.decreaseReferenceCount(PipeRealtimeDataRegionLogSource.class.getName(), 
false);
-    }
+    pendingQueue.offer(event);
   }
 
-  private void extractTsFileInsertion(PipeRealtimeEvent event) {
+  private void extractTsFileInsertion(final PipeRealtimeEvent event) {
     final PipeTsFileInsertionEvent tsFileInsertionEvent =
         (PipeTsFileInsertionEvent) event.getEvent();
     if (!(tsFileInsertionEvent.isLoaded())) {
@@ -91,22 +76,7 @@ public class PipeRealtimeDataRegionLogSource extends 
PipeRealtimeDataRegionSourc
     }
 
     event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
-
-    if (!pendingQueue.waitedOffer(event)) {
-      // this would not happen, but just in case.
-      // pendingQueue is unbounded, so it should never reach capacity.
-      final String errorMessage =
-          String.format(
-              "extract: pending queue of PipeRealtimeDataRegionLogExtractor %s 
"
-                  + "has reached capacity, discard loaded tsFile event %s, 
current state %s",
-              this, event, event.getTsFileEpoch().getState(this));
-      LOGGER.error(errorMessage);
-      PipeDataNodeAgent.runtime()
-          .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-      // ignore this event.
-      
event.decreaseReferenceCount(PipeRealtimeDataRegionLogSource.class.getName(), 
false);
-    }
+    pendingQueue.offer(event);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index 204c9c87d14..1e47a48e500 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -423,21 +423,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
       return;
     }
 
-    if (!pendingQueue.waitedOffer(event)) {
-      // This would not happen, but just in case.
-      // pendingQueue is unbounded, so it should never reach capacity.
-      LOGGER.error(
-          "extract: pending queue of PipeRealtimeDataRegionHybridExtractor {} "
-              + "has reached capacity, discard heartbeat event {}",
-          this,
-          event);
-
-      // Do not report exception since the PipeHeartbeatEvent doesn't affect
-      // the correction of pipe progress.
-
-      // Ignore this event.
-      
event.decreaseReferenceCount(PipeRealtimeDataRegionSource.class.getName(), 
false);
-    }
+    pendingQueue.offer(event);
   }
 
   protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
@@ -449,24 +435,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
               
.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
       return;
     }
-    extractDirectly(event);
-  }
-
-  protected void extractDirectly(final PipeRealtimeEvent event) {
-    if (!pendingQueue.waitedOffer(event)) {
-      // This would not happen, but just in case.
-      // Pending is unbounded, so it should never reach capacity.
-      final String errorMessage =
-          String.format(
-              "extract: pending queue of %s %s " + "has reached capacity, 
discard event %s",
-              this.getClass().getSimpleName(), this, event);
-      LOGGER.error(errorMessage);
-      PipeDataNodeAgent.runtime()
-          .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-      // Ignore the event.
-      
event.decreaseReferenceCount(PipeRealtimeDataRegionSource.class.getName(), 
false);
-    }
+    pendingQueue.offer(event);
   }
 
   protected void maySkipIndex4Event(final PipeRealtimeEvent event) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index 8f74bc63fb3..98bfb30391a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -46,7 +46,7 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
     }
 
     if (event.getEvent() instanceof PipeDeleteDataNodeEvent) {
-      extractDirectly(event);
+      pendingQueue.offer(event);
       return;
     }
 
@@ -59,21 +59,7 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
       return;
     }
 
-    if (!pendingQueue.waitedOffer(event)) {
-      // This would not happen, but just in case.
-      // Pending is unbounded, so it should never reach capacity.
-      final String errorMessage =
-          String.format(
-              "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor 
%s "
-                  + "has reached capacity, discard TsFile event %s, current 
state %s",
-              this, event, event.getTsFileEpoch().getState(this));
-      LOGGER.error(errorMessage);
-      PipeDataNodeAgent.runtime()
-          .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-      // Ignore the event.
-      
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(),
 false);
-    }
+    pendingQueue.offer(event);
 
     event.getTsFileEpoch().clearState(this);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
index e4a3a545a58..b1677c9ffe6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
@@ -65,7 +65,7 @@ public class TsFileDeduplicationBlockingPendingQueue extends 
SubscriptionBlockin
 
   @Override
   public void directOffer(final Event event) {
-    inputPendingQueue.directOffer(event);
+    inputPendingQueue.offer(event);
   }
 
   private synchronized Event filter(final Event event) { // make it 
synchronized
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index aa93b093254..8773b03f9f3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -50,26 +50,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
     this.eventCounter = eventCounter;
   }
 
-  public boolean waitedOffer(final E event) {
-    checkBeforeOffer(event);
-    try {
-      final boolean offered =
-          pendingQueue.offer(
-              event,
-              
PIPE_CONFIG.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs(),
-              TimeUnit.MILLISECONDS);
-      if (offered) {
-        eventCounter.increaseEventCount(event);
-      }
-      return offered;
-    } catch (final InterruptedException e) {
-      LOGGER.info("pending queue offer is interrupted.", e);
-      Thread.currentThread().interrupt();
-      return false;
-    }
-  }
-
-  public boolean directOffer(final E event) {
+  public boolean offer(final E event) {
     checkBeforeOffer(event);
     final boolean offered = pendingQueue.offer(event);
     if (offered) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
index 0d6e7fcdad5..6631f60a65c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
@@ -477,10 +477,17 @@ public abstract class TreePattern {
     final List<TreePattern> sortedPatterns = new ArrayList<>(patterns);
     sortedPatterns.sort(
         (o1, o2) -> {
+          final List<PartialPath> p1List = o1.getBaseInclusionPaths();
+          final List<PartialPath> p2List = o2.getBaseInclusionPaths();
+
+          if (p1List.isEmpty()) {
+            return p2List.isEmpty() ? 1 : -1;
+          }
+
           // We can only approximate comparison here since TreePattern 
represents multiple paths.
           // We use the first inclusion path as a representative.
-          final PartialPath p1 = o1.getBaseInclusionPaths().get(0);
-          final PartialPath p2 = o2.getBaseInclusionPaths().get(0);
+          final PartialPath p1 = p1List.get(0);
+          final PartialPath p2 = p2List.get(0);
 
           // 1. Length: Shorter is generally broader (e.g., root.** vs 
root.sg.d1)
           final int lenCompare = Integer.compare(p1.getNodeLength(), 
p2.getNodeLength());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index 76c145d0dab..61250eaa038 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -374,7 +374,7 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
         final Pair<IoTDBSyncClient, Boolean> nextClientAndStatus =
             endPoint2ClientAndStatus.get(endPointList.get(nextClientIndex));
         if (Boolean.TRUE.equals(nextClientAndStatus.getRight())
-            && clientAndStatus.getLeft() != null) {
+            && nextClientAndStatus.getLeft() != null) {
           return nextClientAndStatus;
         }
       }


Reply via email to