This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch pipe-garbage-cleaning
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-garbage-cleaning by this
push:
new 0bd36cac877 fix
0bd36cac877 is described below
commit 0bd36cac877978fb572edcd43b1d64d4637bde15
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 30 18:02:18 2026 +0800
fix
---
.../db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java | 4 ++--
.../db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java | 2 +-
.../pipe/sink/protocol/websocket/WebSocketConnectorServer.java | 7 +++++--
.../db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java | 9 +++++----
.../db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java | 9 +++++----
.../iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java | 4 +---
6 files changed, 19 insertions(+), 16 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 73e2213eea6..88a79146295 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -477,7 +477,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
while (true) {
final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random()
* clientSize));
- if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+ if (isUnhealthy(targetNodeUrl) && n < clientSize) {
n++;
continue;
}
@@ -498,7 +498,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
long n = 0;
while (true) {
for (final TEndPoint targetNodeUrl : endPointList) {
- if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+ if (isUnhealthy(targetNodeUrl) && n < clientSize) {
n++;
continue;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 622f4e4f0cd..7f904324bbb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -172,7 +172,7 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
throw new PipeConnectionException(
String.format(
"Network error when transfer tsfile event %s, because %s.",
- ((PipeDeleteDataNodeEvent) event).coreReportMessage(),
e.getMessage()),
+ ((EnrichedEvent) event).coreReportMessage(), e.getMessage()),
e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
index 240de912f6f..b2c64de7ead 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
@@ -35,8 +35,10 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -90,14 +92,15 @@ public class WebSocketConnectorServer extends
WebSocketServer {
final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
eventsWaitingForTransfer.remove(pipeName);
while (!eventTransferQueue.isEmpty()) {
- eventTransferQueue.forEach(
+ final List<EventWaitingForTransfer> eventWrappers = new
ArrayList<>(eventTransferQueue);
+ eventTransferQueue.clear();
+ eventWrappers.forEach(
(eventWrapper) -> {
if (eventWrapper.event instanceof EnrichedEvent) {
((EnrichedEvent) eventWrapper.event)
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
}
});
- eventTransferQueue.clear();
synchronized (eventTransferQueue) {
eventTransferQueue.notifyAll();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
index 52c18a10da9..c105a86ff77 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
@@ -160,18 +160,19 @@ public class PipeTableModelTsFileBuilder extends
PipeTsFileBuilder {
e.getMessage(),
e);
+ final File file = fileWriter.getIOWriter().getFile();
try {
fileWriter.close();
} catch (final Exception closeException) {
LOGGER.warn(
"Batch id = {}: Failed to close the tsfile {} after failed to
write tablets into, because {}",
currentBatchId.get(),
- fileWriter.getIOWriter().getFile().getPath(),
+ file.getPath(),
closeException.getMessage(),
closeException);
} finally {
// Add current writing file to the list and delete the file
- sealedFiles.add(new Pair<>(dataBase,
fileWriter.getIOWriter().getFile()));
+ sealedFiles.add(new Pair<>(dataBase, file));
}
for (final Pair<String, File> sealedFile : sealedFiles) {
@@ -181,7 +182,7 @@ public class PipeTableModelTsFileBuilder extends
PipeTsFileBuilder {
currentBatchId.get(),
deleteSuccess ? "Successfully" : "Failed to",
sealedFile.right.getPath(),
- fileWriter.getIOWriter().getFile().getPath(),
+ file.getPath(),
deleteSuccess ? "" : "Maybe the tsfile needs to be deleted
manually.");
}
sealedFiles.clear();
@@ -191,8 +192,8 @@ public class PipeTableModelTsFileBuilder extends
PipeTsFileBuilder {
throw e;
}
- fileWriter.close();
final File sealedFile = fileWriter.getIOWriter().getFile();
+ fileWriter.close();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Batch id = {}: Seal tsfile {} successfully.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
index d46180bd793..89827cd8421 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
@@ -155,18 +155,19 @@ public class PipeTreeModelTsFileBuilder extends
PipeTsFileBuilder {
e.getMessage(),
e);
+ final File file = fileWriter.getIOWriter().getFile();
try {
fileWriter.close();
} catch (final Exception closeException) {
LOGGER.warn(
"Batch id = {}: Failed to close the tsfile {} after failed to
write tablets into, because {}",
currentBatchId.get(),
- fileWriter.getIOWriter().getFile().getPath(),
+ file.getPath(),
closeException.getMessage(),
closeException);
} finally {
// Add current writing file to the list and delete the file
- sealedFiles.add(new Pair<>(null,
fileWriter.getIOWriter().getFile()));
+ sealedFiles.add(new Pair<>(null, file));
}
for (final Pair<String, File> sealedFile : sealedFiles) {
@@ -176,7 +177,7 @@ public class PipeTreeModelTsFileBuilder extends
PipeTsFileBuilder {
currentBatchId.get(),
deleteSuccess ? "Successfully" : "Failed to",
sealedFile.right.getPath(),
- fileWriter.getIOWriter().getFile().getPath(),
+ file.getPath(),
deleteSuccess ? "" : "Maybe the tsfile needs to be deleted
manually.");
}
sealedFiles.clear();
@@ -186,8 +187,8 @@ public class PipeTreeModelTsFileBuilder extends
PipeTsFileBuilder {
throw e;
}
- fileWriter.close();
final File sealedFile = fileWriter.getIOWriter().getFile();
+ fileWriter.close();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Batch id = {}: Seal tsfile {} successfully.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
index 81557963822..217adeed43e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java
@@ -106,9 +106,7 @@ public abstract class PipeTsFileBuilder {
return baseDir;
}
throw new PipeException(
- String.format(
- "Failed to create batch file dir %s. (Batch id = %s)",
- baseDir.getPath(), currentBatchId.get()));
+ String.format("Failed to create batch file dir. (Batch id = %s)",
currentBatchId.get()));
}
}