This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch Fix-drop
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b1dd7ea0363f47113e647a3228c06c88d38f3eef
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 22 16:44:16 2026 +0800

    fix
---
 .../agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java  | 3 ++-
 .../iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java      | 3 +--
 .../db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java  | 3 +--
 .../pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java   | 3 +--
 .../db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java   | 3 +--
 .../commons/pipe/agent/task/connection/BlockingPendingQueue.java    | 6 +++---
 6 files changed, 9 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
index bea5fcfe854..8641dbc7867 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
@@ -364,7 +364,8 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
     super.discardEventsOfPipe(committerKey);
     tsfileInsertEventDeque.removeIf(
         event -> {
-          if (event instanceof EnrichedEvent && 
isEventFromPipe((EnrichedEvent) event, committerKey)) {
+          if (event instanceof EnrichedEvent
+              && isEventFromPipe((EnrichedEvent) event, committerKey)) {
             if (((EnrichedEvent) event)
                 
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
               eventCounter.decreaseEventCount(event);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index b633efe3129..90d325f6d23 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -257,8 +257,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     return committerKey.getPipeName().equals(event.getPipeName())
         && committerKey.getCreationTime() == event.getCreationTime()
         && committerKey.getRegionId() == event.getRegionId()
-        && (committerKey.getRestartTimes() < 0
-            || committerKey.equals(event.getCommitterKey()));
+        && (committerKey.getRestartTimes() < 0 || 
committerKey.equals(event.getCommitterKey()));
   }
 
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index 4e2189dd5a1..aede0e994d9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -177,8 +177,7 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
     return committerKey.getPipeName().equals(event.getPipeName())
         && committerKey.getCreationTime() == event.getCreationTime()
         && committerKey.getRegionId() == event.getRegionId()
-        && (committerKey.getRestartTimes() < 0
-            || committerKey.equals(event.getCommitterKey()));
+        && (committerKey.getRestartTimes() < 0 || 
committerKey.equals(event.getCommitterKey()));
   }
 
   public synchronized void decreaseEventsReferenceCount(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index b542627f942..32a2c191048 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -880,8 +880,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     return committerKey.getPipeName().equals(event.getPipeName())
         && committerKey.getCreationTime() == event.getCreationTime()
         && committerKey.getRegionId() == event.getRegionId()
-        && (committerKey.getRestartTimes() < 0
-            || committerKey.equals(event.getCommitterKey()));
+        && (committerKey.getRestartTimes() < 0 || 
committerKey.equals(event.getCommitterKey()));
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
index 3702da3501a..baddf4727d6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
@@ -529,8 +529,7 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
     return committerKey.getPipeName().equals(event.getPipeName())
         && committerKey.getCreationTime() == event.getCreationTime()
         && committerKey.getRegionId() == event.getRegionId()
-        && (committerKey.getRestartTimes() < 0
-            || committerKey.equals(event.getCommitterKey()));
+        && (committerKey.getRestartTimes() < 0 || 
committerKey.equals(event.getCommitterKey()));
   }
 
   private boolean isQueueAvailable(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index 9ae5f3873dd..c430e3f6b06 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -144,7 +144,8 @@ public abstract class BlockingPendingQueue<E extends Event> 
{
     droppedPipeTaskKeys.add(committerKey);
     pendingQueue.removeIf(
         event -> {
-          if (event instanceof EnrichedEvent && 
isEventFromPipe((EnrichedEvent) event, committerKey)) {
+          if (event instanceof EnrichedEvent
+              && isEventFromPipe((EnrichedEvent) event, committerKey)) {
             if (((EnrichedEvent) 
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
               eventCounter.decreaseEventCount(event);
             }
@@ -197,8 +198,7 @@ public abstract class BlockingPendingQueue<E extends Event> 
{
     return committerKey.getPipeName().equals(event.getPipeName())
         && committerKey.getCreationTime() == event.getCreationTime()
         && committerKey.getRegionId() == event.getRegionId()
-        && (committerKey.getRestartTimes() < 0
-            || committerKey.equals(event.getCommitterKey()));
+        && (committerKey.getRestartTimes() < 0 || 
committerKey.equals(event.getCommitterKey()));
   }
 
   protected boolean isEventFromDroppedPipe(final E event) {

Reply via email to