This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch pipe-garbage-cleaning
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-garbage-cleaning by this 
push:
     new 0bd36cac877 fix
0bd36cac877 is described below

commit 0bd36cac877978fb572edcd43b1d64d4637bde15
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 30 18:02:18 2026 +0800

    fix
---
 .../db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java     | 4 ++--
 .../db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java  | 2 +-
 .../pipe/sink/protocol/websocket/WebSocketConnectorServer.java   | 7 +++++--
 .../db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java   | 9 +++++----
 .../db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java    | 9 +++++----
 .../iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java       | 4 +---
 6 files changed, 19 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 73e2213eea6..88a79146295 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -477,7 +477,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
       while (true) {
         final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random() 
* clientSize));
-        if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+        if (isUnhealthy(targetNodeUrl) && n < clientSize) {
           n++;
           continue;
         }
@@ -498,7 +498,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       long n = 0;
       while (true) {
         for (final TEndPoint targetNodeUrl : endPointList) {
-          if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+          if (isUnhealthy(targetNodeUrl) && n < clientSize) {
             n++;
             continue;
           }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 622f4e4f0cd..7f904324bbb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -172,7 +172,7 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
       throw new PipeConnectionException(
           String.format(
               "Network error when transfer tsfile event %s, because %s.",
-              ((PipeDeleteDataNodeEvent) event).coreReportMessage(), 
e.getMessage()),
+              ((EnrichedEvent) event).coreReportMessage(), e.getMessage()),
           e);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
index 240de912f6f..b2c64de7ead 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
@@ -35,8 +35,10 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -90,14 +92,15 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
       final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
           eventsWaitingForTransfer.remove(pipeName);
       while (!eventTransferQueue.isEmpty()) {
-        eventTransferQueue.forEach(
+        final List<EventWaitingForTransfer> eventWrappers = new 
ArrayList<>(eventTransferQueue);
+        eventTransferQueue.clear();
+        eventWrappers.forEach(
             (eventWrapper) -> {
               if (eventWrapper.event instanceof EnrichedEvent) {
                 ((EnrichedEvent) eventWrapper.event)
                     
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
               }
             });
-        eventTransferQueue.clear();
         synchronized (eventTransferQueue) {
           eventTransferQueue.notifyAll();
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
index 52c18a10da9..c105a86ff77 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
@@ -160,18 +160,19 @@ public class PipeTableModelTsFileBuilder extends 
PipeTsFileBuilder {
             e.getMessage(),
             e);
 
+        final File file = fileWriter.getIOWriter().getFile();
         try {
           fileWriter.close();
         } catch (final Exception closeException) {
           LOGGER.warn(
               "Batch id = {}: Failed to close the tsfile {} after failed to 
write tablets into, because {}",
               currentBatchId.get(),
-              fileWriter.getIOWriter().getFile().getPath(),
+              file.getPath(),
               closeException.getMessage(),
               closeException);
         } finally {
           // Add current writing file to the list and delete the file
-          sealedFiles.add(new Pair<>(dataBase, 
fileWriter.getIOWriter().getFile()));
+          sealedFiles.add(new Pair<>(dataBase, file));
         }
 
         for (final Pair<String, File> sealedFile : sealedFiles) {
@@ -181,7 +182,7 @@ public class PipeTableModelTsFileBuilder extends 
PipeTsFileBuilder {
               currentBatchId.get(),
               deleteSuccess ? "Successfully" : "Failed to",
               sealedFile.right.getPath(),
-              fileWriter.getIOWriter().getFile().getPath(),
+              file.getPath(),
               deleteSuccess ? "" : "Maybe the tsfile needs to be deleted 
manually.");
         }
         sealedFiles.clear();
@@ -191,8 +192,8 @@ public class PipeTableModelTsFileBuilder extends 
PipeTsFileBuilder {
         throw e;
       }
 
-      fileWriter.close();
       final File sealedFile = fileWriter.getIOWriter().getFile();
+      fileWriter.close();
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
             "Batch id = {}: Seal tsfile {} successfully.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
index d46180bd793..89827cd8421 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
@@ -155,18 +155,19 @@ public class PipeTreeModelTsFileBuilder extends 
PipeTsFileBuilder {
             e.getMessage(),
             e);
 
+        final File file = fileWriter.getIOWriter().getFile();
         try {
           fileWriter.close();
         } catch (final Exception closeException) {
           LOGGER.warn(
               "Batch id = {}: Failed to close the tsfile {} after failed to 
write tablets into, because {}",
               currentBatchId.get(),
-              fileWriter.getIOWriter().getFile().getPath(),
+              file.getPath(),
               closeException.getMessage(),
               closeException);
         } finally {
           // Add current writing file to the list and delete the file
-          sealedFiles.add(new Pair<>(null, 
fileWriter.getIOWriter().getFile()));
+          sealedFiles.add(new Pair<>(null, file));
         }
 
         for (final Pair<String, File> sealedFile : sealedFiles) {
@@ -176,7 +177,7 @@ public class PipeTreeModelTsFileBuilder extends 
PipeTsFileBuilder {
               currentBatchId.get(),
               deleteSuccess ? "Successfully" : "Failed to",
               sealedFile.right.getPath(),
-              fileWriter.getIOWriter().getFile().getPath(),
+              file.getPath(),
               deleteSuccess ? "" : "Maybe the tsfile needs to be deleted 
manually.");
         }
         sealedFiles.clear();
@@ -186,8 +187,8 @@ public class PipeTreeModelTsFileBuilder extends 
PipeTsFileBuilder {
         throw e;
       }
 
-      fileWriter.close();
       final File sealedFile = fileWriter.getIOWriter().getFile();
+      fileWriter.close();
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
             "Batch id = {}: Seal tsfile {} successfully.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
index 81557963822..217adeed43e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
@@ -106,9 +106,7 @@ public abstract class PipeTsFileBuilder {
         return baseDir;
       }
       throw new PipeException(
-          String.format(
-              "Failed to create batch file dir %s. (Batch id = %s)",
-              baseDir.getPath(), currentBatchId.get()));
+          String.format("Failed to create batch file dir. (Batch id = %s)", 
currentBatchId.get()));
     }
   }
 

Reply via email to