This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new ffad66625e3 Pipe: discard batched events before restarting pipes 
(#13284)
ffad66625e3 is described below

commit ffad66625e34cc23080f5f44676382a60d106a51
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Aug 26 11:49:38 2024 +0800

    Pipe: discard batched events before restarting pipes (#13284)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../payload/evolvable/batch/PipeTabletEventBatch.java | 19 ++++++++++++++++++-
 .../evolvable/batch/PipeTransferBatchReqBuilder.java  |  5 +++++
 .../thrift/async/IoTDBDataRegionAsyncConnector.java   |  6 ++----
 .../thrift/sync/IoTDBDataRegionSyncConnector.java     |  5 +++++
 .../task/subtask/connector/PipeConnectorSubtask.java  |  5 +++--
 .../connector/PipeConnectorSubtaskLifeCycle.java      |  2 +-
 .../pipe/connector/protocol/IoTDBConnector.java       |  8 ++++++++
 7 files changed, 42 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index db5ba85d7ba..41135281339 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -115,7 +116,23 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
     events.clear();
   }
 
-  public void decreaseEventsReferenceCount(final String holderMessage, final 
boolean shouldReport) {
+  /**
+   * Discard all events of the given pipe. This method only clears the 
reference count of the events
+   * and discard them, but do not modify other objects (such as buffers) for 
simplicity.
+   */
+  public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
+    events.removeIf(
+        event -> {
+          if (pipeNameToDrop.equals(event.getPipeName())) {
+            
event.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
+            return true;
+          }
+          return false;
+        });
+  }
+
+  public synchronized void decreaseEventsReferenceCount(
+      final String holderMessage, final boolean shouldReport) {
     events.forEach(event -> event.decreaseReferenceCount(holderMessage, 
shouldReport));
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index f28f19e4da1..77fd73f8aba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -184,6 +184,11 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
         && 
endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
   }
 
+  public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
+    defaultBatch.discardEventsOfPipe(pipeNameToDrop);
+    endPointToBatch.values().forEach(batch -> 
batch.discardEventsOfPipe(pipeNameToDrop));
+  }
+
   @Override
   public synchronized void close() {
     defaultBatch.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 43c5fa6eff2..3f37b247869 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -515,11 +515,9 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
 
   //////////////////////////// Operations for close 
////////////////////////////
 
-  /**
-   * When a pipe is dropped, the connector maybe reused and will not be 
closed. So we just discard
-   * its queued events in the output pipe connector.
-   */
+  @Override
   public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
+    tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
     retryEventQueue.removeIf(
         event -> {
           if (event instanceof EnrichedEvent
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index fca40f21d64..a92790c5e80 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -492,6 +492,11 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
     LOGGER.info("Successfully transferred file {}.", tsFile);
   }
 
+  @Override
+  public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
+    tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
+  }
+
   @Override
   public void close() {
     if (tabletBatchBuilder != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index cc78ebdea54..a832d43f73e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask.connector;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.task.subtask.PipeAbstractConnectorSubtask;
@@ -268,8 +269,8 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
       }
     }
 
-    if (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector) {
-      ((IoTDBDataRegionAsyncConnector) 
outputPipeConnector).discardEventsOfPipe(pipeNameToDrop);
+    if (outputPipeConnector instanceof IoTDBConnector) {
+      ((IoTDBConnector) 
outputPipeConnector).discardEventsOfPipe(pipeNameToDrop);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
index a28a0289b08..e3d4d35a171 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
@@ -89,7 +89,7 @@ public class PipeConnectorSubtaskLifeCycle implements 
AutoCloseable {
    *     the {@link PipeConnectorSubtask} should never be used again
    * @throws IllegalStateException if {@link 
PipeConnectorSubtaskLifeCycle#registeredTaskCount} <= 0
    */
-  public synchronized boolean deregister(String pipeNameToDeregister) {
+  public synchronized boolean deregister(final String pipeNameToDeregister) {
     if (registeredTaskCount <= 0) {
       throw new IllegalStateException("registeredTaskCount <= 0");
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 9dc08f93402..55231032e5a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -452,6 +452,14 @@ public abstract class IoTDBConnector implements 
PipeConnector {
     GLOBAL_RATE_LIMITER.acquire(bytesLength);
   }
 
+  /**
+   * When a pipe is dropped, the connector maybe reused and will not be 
closed. We need to discard
+   * its batched or queued events in the output pipe connector.
+   */
+  public synchronized void discardEventsOfPipe(final String pipeName) {
+    // Do nothing by default
+  }
+
   public PipeReceiverStatusHandler statusHandler() {
     return receiverStatusHandler;
   }

Reply via email to