This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 199342da8df Pipe: clear event reference count instead of decreasing 
when closing to improve idempotence & add creation time for committer key to 
avoid outdated commit & detect re-increasing reference count to avoid NPE 
(#12315)
199342da8df is described below

commit 199342da8df903ffa7ba821fd94fa4e17bbac11a
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Apr 12 23:59:53 2024 +0800

    Pipe: clear event reference count instead of decreasing when closing to 
improve idempotence & add creation time for committer key to avoid outdated 
commit & detect re-increasing reference count to avoid NPE (#12315)
---
 .../pipe/execution/PipeConfigNodeSubtask.java      |  9 ++----
 .../builder/PipeTransferBatchReqBuilder.java       | 37 ++++++++++++++++------
 .../async/IoTDBDataRegionAsyncConnector.java       | 26 +++++++++++++++
 .../PipeTransferTabletBatchEventHandler.java       | 12 +++++--
 .../PipeTransferTabletInsertionEventHandler.java   |  5 ---
 .../PipeTransferTsFileInsertionEventHandler.java   |  2 --
 .../db/pipe/resource/memory/PipeMemoryBlock.java   |  7 +++-
 .../pipe/task/connection/PipeEventCollector.java   |  8 +++--
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |  3 +-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  2 +-
 .../subtask/connector/PipeConnectorSubtask.java    |  6 ++--
 .../connector/PipeConnectorSubtaskManager.java     | 10 ++++--
 .../subtask/processor/PipeProcessorSubtask.java    |  5 +--
 .../task/stage/SubscriptionTaskConnectorStage.java |  2 +-
 .../SubscriptionConnectorSubtaskManager.java       | 10 ++++--
 .../iotdb/commons/pipe/event/EnrichedEvent.java    | 24 +++++++++++++-
 .../pipe/progress/PipeEventCommitManager.java      | 26 +++++++++------
 .../commons/pipe/progress/PipeEventCommitter.java  | 15 +++++++--
 .../task/subtask/PipeAbstractConnectorSubtask.java |  2 +-
 .../pipe/task/subtask/PipeReportableSubtask.java   |  2 +-
 .../commons/pipe/task/subtask/PipeSubtask.java     | 18 +++++++----
 21 files changed, 168 insertions(+), 63 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
index bbe0ead0edd..376548b15d8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
@@ -150,7 +150,7 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
 
       outputPipeConnector.transfer(event);
 
-      releaseLastEvent(true);
+      decreaseReferenceCountAndReleaseLastEvent(true);
     } catch (PipeException e) {
       if (!isClosed.get()) {
         throw e;
@@ -159,7 +159,7 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
             "{} in pipe transfer, ignored because pipe is dropped.",
             e.getClass().getSimpleName(),
             e);
-        releaseLastEvent(false);
+        clearReferenceCountAndReleaseLastEvent();
       }
     } catch (Exception e) {
       if (!isClosed.get()) {
@@ -169,16 +169,13 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
             e);
       } else {
         LOGGER.info("Exception in pipe transfer, ignored because pipe is 
dropped.", e);
-        releaseLastEvent(false);
+        clearReferenceCountAndReleaseLastEvent();
       }
     }
 
     return true;
   }
 
-  // synchronized for close() and releaseLastEvent(). make sure that the 
lastEvent
-  // will not be updated after pipeProcessor.close() to avoid resource leak
-  // because of the lastEvent is not released.
   @Override
   public void close() {
     isClosed.set(true);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index 0ea50ae73ae..225f39c149c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -43,6 +43,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
@@ -70,6 +71,8 @@ public abstract class PipeTransferBatchReqBuilder implements 
AutoCloseable {
   protected final PipeMemoryBlock allocatedMemoryBlock;
   protected long totalBufferSize = 0;
 
+  private final AtomicBoolean isClosed = new AtomicBoolean(true);
+
   protected PipeTransferBatchReqBuilder(PipeParameters parameters) {
     maxDelayInMs =
         parameters.getIntOrDefault(
@@ -104,6 +107,8 @@ public abstract class PipeTransferBatchReqBuilder 
implements AutoCloseable {
           requestMaxBatchSizeInBytes,
           getMaxBatchSizeInBytes());
     }
+
+    isClosed.set(false);
   }
 
   /**
@@ -123,17 +128,23 @@ public abstract class PipeTransferBatchReqBuilder 
implements AutoCloseable {
     // The deduplication logic here is to avoid the accumulation of the same 
event in a batch when
     // retrying.
     if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) {
-      events.add(event);
-      requestCommitIds.add(requestCommitId);
-      final int bufferSize = buildTabletInsertionBuffer(event);
-
-      ((EnrichedEvent) 
event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName());
-
-      if (firstEventProcessingTime == Long.MIN_VALUE) {
-        firstEventProcessingTime = System.currentTimeMillis();
+      // We increase the reference count for this event to determine if the 
event may be released.
+      if (((EnrichedEvent) event)
+          
.increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName())) {
+        events.add(event);
+        requestCommitIds.add(requestCommitId);
+
+        final int bufferSize = buildTabletInsertionBuffer(event);
+        totalBufferSize += bufferSize;
+
+        if (firstEventProcessingTime == Long.MIN_VALUE) {
+          firstEventProcessingTime = System.currentTimeMillis();
+        }
+      } else {
+        LOGGER.error(
+            "TabletInsertionEvent {} can not be transferred because the 
reference count can not be increased, the data represented by this event is 
lost",
+            ((EnrichedEvent) event).coreReportMessage());
       }
-
-      totalBufferSize += bufferSize;
     }
 
     return totalBufferSize >= getMaxBatchSizeInBytes()
@@ -203,6 +214,8 @@ public abstract class PipeTransferBatchReqBuilder 
implements AutoCloseable {
 
   @Override
   public synchronized void close() {
+    isClosed.set(true);
+
     for (final Event event : events) {
       if (event instanceof EnrichedEvent) {
         ((EnrichedEvent) event).clearReferenceCount(this.getClass().getName());
@@ -210,4 +223,8 @@ public abstract class PipeTransferBatchReqBuilder 
implements AutoCloseable {
     }
     allocatedMemoryBlock.close();
   }
+
+  public boolean isClosed() {
+    return isClosed.get();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index dd6b002c523..2607415325f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -170,6 +170,15 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
         final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent =
             (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
+        // We increase the reference count for this event to determine if the 
event may be released.
+        if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+            IoTDBDataRegionAsyncConnector.class.getName())) {
+          LOGGER.error(
+              "PipeInsertNodeTabletInsertionEvent {} can not be transferred 
because the reference count can not be increased, the data represented by this 
event is lost",
+              pipeInsertNodeTabletInsertionEvent.coreReportMessage());
+          return;
+        }
+
         final InsertNode insertNode =
             
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
         final TPipeTransferReq pipeTransferReq =
@@ -185,6 +194,15 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
         final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
             (PipeRawTabletInsertionEvent) tabletInsertionEvent;
+        // We increase the reference count for this event to determine if the 
event may be released.
+        if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
+            IoTDBDataRegionAsyncConnector.class.getName())) {
+          LOGGER.error(
+              "PipeRawTabletInsertionEvent {} can not be transferred because 
the reference count can not be increased, the data represented by this event is 
lost",
+              pipeRawTabletInsertionEvent.coreReportMessage());
+          return;
+        }
+
         final PipeTransferTabletRawReq pipeTransferTabletRawReq =
             PipeTransferTabletRawReq.toTPipeTransferReq(
                 pipeRawTabletInsertionEvent.convertToTablet(),
@@ -245,6 +263,14 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
 
     final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
         (PipeTsFileInsertionEvent) tsFileInsertionEvent;
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeTsFileInsertionEvent.increaseReferenceCount(
+        IoTDBDataRegionAsyncConnector.class.getName())) {
+      LOGGER.error(
+          "PipeTsFileInsertionEvent {} can not be transferred because the 
reference count can not be increased, the data represented by this event is 
lost",
+          pipeTsFileInsertionEvent.coreReportMessage());
+      return;
+    }
 
     // Just in case. To avoid the case that exception occurred when 
constructing the handler.
     if (!pipeTsFileInsertionEvent.getTsFile().exists()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index f45c012bb1b..668757201f0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -49,6 +49,7 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
   private final TPipeTransferReq req;
 
   private final IoTDBDataRegionAsyncConnector connector;
+  private final IoTDBThriftAsyncPipeTransferBatchReqBuilder batchReqBuilder;
 
   public PipeTransferTabletBatchEventHandler(
       IoTDBThriftAsyncPipeTransferBatchReqBuilder batchBuilder,
@@ -60,6 +61,7 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
     req = batchBuilder.toTPipeTransferReq();
 
     this.connector = connector;
+    this.batchReqBuilder = batchBuilder;
   }
 
   public void transfer(AsyncPipeDataTransferServiceClient client) throws 
TException {
@@ -83,10 +85,16 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
             .statusHandler()
             .handle(status, response.getStatus().getMessage(), 
events.toString());
       }
+      final boolean isClosed = batchReqBuilder.isClosed();
       for (final Event event : events) {
         if (event instanceof EnrichedEvent) {
-          ((EnrichedEvent) event)
-              
.decreaseReferenceCount(PipeTransferTabletBatchEventHandler.class.getName(), 
true);
+          if (isClosed) {
+            ((EnrichedEvent) event)
+                
.clearReferenceCount(PipeTransferTabletBatchEventHandler.class.getName());
+          } else {
+            ((EnrichedEvent) event)
+                
.decreaseReferenceCount(PipeTransferTabletBatchEventHandler.class.getName(), 
true);
+          }
         }
       }
     } catch (Exception e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 8eb32d5a746..5d834440d7f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -50,11 +50,6 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
     this.event = event;
     this.req = req;
     this.connector = connector;
-
-    if (this.event instanceof EnrichedEvent) {
-      ((EnrichedEvent) this.event)
-          
.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName());
-    }
   }
 
   public void transfer(AsyncPipeDataTransferServiceClient client) throws 
TException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 75fa051fe2b..467405cf82a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -91,8 +91,6 @@ public class PipeTransferTsFileInsertionEventHandler
             : new RandomAccessFile(tsFile, "r");
 
     isSealSignalSent = new AtomicBoolean(false);
-
-    
event.increaseReferenceCount(PipeTransferTsFileInsertionEventHandler.class.getName());
   }
 
   public void transfer(AsyncPipeDataTransferServiceClient client) throws 
TException, IOException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index 98947caab56..06dfc0f64a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -160,7 +160,12 @@ public class PipeMemoryBlock implements AutoCloseable {
 
   @Override
   public String toString() {
-    return "PipeMemoryBlock{" + "memoryUsageInBytes=" + 
memoryUsageInBytes.get() + '}';
+    return "PipeMemoryBlock{"
+        + "memoryUsageInBytes="
+        + memoryUsageInBytes.get()
+        + ", isReleased="
+        + isReleased
+        + '}';
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index bc6f1bb6291..730cc5bb661 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -46,14 +46,18 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
 
   private final EnrichedDeque<Event> bufferQueue;
 
+  private final long creationTime;
+
   private final int regionId;
 
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
   private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
 
-  public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue, 
int regionId) {
+  public PipeEventCollector(
+      BoundedBlockingPendingQueue<Event> pendingQueue, long creationTime, int 
regionId) {
     this.pendingQueue = pendingQueue;
+    this.creationTime = creationTime;
     this.regionId = regionId;
     bufferQueue = new EnrichedDeque<>(new LinkedList<>());
   }
@@ -129,7 +133,7 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
 
       // Assign a commit id for this event in order to report progress in 
order.
       PipeEventCommitManager.getInstance()
-          .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, regionId);
+          .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, 
creationTime, regionId);
     }
     if (event instanceof PipeHeartbeatEvent) {
       ((PipeHeartbeatEvent) event).recordBufferQueueSize(bufferQueue);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index becb0c78dd7..001978ebe90 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -79,7 +79,8 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
 
   @Override
   public void dropSubtask() throws PipeException {
-    PipeConnectorSubtaskManager.instance().deregister(pipeName, regionId, 
connectorSubtaskId);
+    PipeConnectorSubtaskManager.instance()
+        .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
   }
 
   public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 2540debdd8d..d88bfb0530c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -98,7 +98,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
     // old one, so we need creationTime to make their hash code different in 
the map.
     final String taskId = pipeName + "_" + regionId + "_" + creationTime;
     final PipeEventCollector pipeConnectorOutputEventCollector =
-        new PipeEventCollector(pipeConnectorOutputPendingQueue, regionId);
+        new PipeEventCollector(pipeConnectorOutputPendingQueue, creationTime, 
regionId);
     this.pipeProcessorSubtask =
         new PipeProcessorSubtask(
             taskId,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 0f0bdeee689..c45f2b6aede 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -113,7 +113,7 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
                 : event);
       }
 
-      releaseLastEvent(true);
+      decreaseReferenceCountAndReleaseLastEvent(true);
     } catch (PipeException e) {
       if (!isClosed.get()) {
         throw e;
@@ -122,7 +122,7 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
             "{} in pipe transfer, ignored because pipe is dropped.",
             e.getClass().getSimpleName(),
             e);
-        releaseLastEvent(false);
+        clearReferenceCountAndReleaseLastEvent();
       }
     } catch (Exception e) {
       if (!isClosed.get()) {
@@ -137,7 +137,7 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
             e);
       } else {
         LOGGER.info("Exception in pipe transfer, ignored because pipe is 
dropped.", e);
-        releaseLastEvent(false);
+        clearReferenceCountAndReleaseLastEvent();
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 124260a434e..968c73b4c5d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -65,7 +65,11 @@ public class PipeConnectorSubtaskManager {
             // for matching in `CONNECTOR_CONSTRUCTORS`
             .toLowerCase();
     PipeEventCommitManager.getInstance()
-        .register(environment.getPipeName(), environment.getRegionId(), 
connectorKey);
+        .register(
+            environment.getPipeName(),
+            environment.getCreationTime(),
+            environment.getRegionId(),
+            connectorKey);
 
     final boolean isDataRegionConnector =
         StorageEngine.getInstance()
@@ -145,7 +149,7 @@ public class PipeConnectorSubtaskManager {
   }
 
   public synchronized void deregister(
-      String pipeName, int dataRegionId, String attributeSortedString) {
+      String pipeName, long creationTime, int dataRegionId, String 
attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
@@ -158,7 +162,7 @@ public class PipeConnectorSubtaskManager {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
     }
 
-    PipeEventCommitManager.getInstance().deregister(pipeName, dataRegionId);
+    PipeEventCommitManager.getInstance().deregister(pipeName, creationTime, 
dataRegionId);
   }
 
   public synchronized void start(String attributeSortedString) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 6645f2c3d05..021099cac91 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -142,7 +142,8 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
               outputEventCollector);
         }
       }
-      releaseLastEvent(!isClosed.get() && 
outputEventCollector.hasNoCollectInvocationAfterReset());
+      decreaseReferenceCountAndReleaseLastEvent(
+          !isClosed.get() && 
outputEventCollector.hasNoCollectInvocationAfterReset());
     } catch (PipeRuntimeOutOfMemoryCriticalException e) {
       LOGGER.info(
           "Temporarily out of memory in pipe event processing, will wait for 
the memory to release.",
@@ -157,7 +158,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
             e);
       } else {
         LOGGER.info("Exception in pipe event processing, ignored because pipe 
is dropped.", e);
-        releaseLastEvent(false);
+        clearReferenceCountAndReleaseLastEvent();
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
index 1bbcc153beb..e5f965f7176 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
@@ -67,7 +67,7 @@ public class SubscriptionTaskConnectorStage extends 
PipeTaskConnectorStage {
   @Override
   public void dropSubtask() throws PipeException {
     SubscriptionConnectorSubtaskManager.instance()
-        .deregister(pipeName, regionId, connectorSubtaskId);
+        .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
   }
 
   public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
index ef4c7e578b1..6627bf2948a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
@@ -73,7 +73,11 @@ public class SubscriptionConnectorSubtaskManager {
     }
 
     PipeEventCommitManager.getInstance()
-        .register(environment.getPipeName(), environment.getRegionId(), 
connectorKey);
+        .register(
+            environment.getPipeName(),
+            environment.getCreationTime(),
+            environment.getRegionId(),
+            connectorKey);
 
     String attributeSortedString = new 
TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
     attributeSortedString = "__subscription_" + attributeSortedString;
@@ -137,7 +141,7 @@ public class SubscriptionConnectorSubtaskManager {
   }
 
   public synchronized void deregister(
-      String pipeName, int dataRegionId, String attributeSortedString) {
+      String pipeName, long creationTime, int dataRegionId, String 
attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
@@ -148,7 +152,7 @@ public class SubscriptionConnectorSubtaskManager {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
     }
 
-    PipeEventCommitManager.getInstance().deregister(pipeName, dataRegionId);
+    PipeEventCommitManager.getInstance().deregister(pipeName, creationTime, 
dataRegionId);
   }
 
   public synchronized void start(String attributeSortedString) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 3a147a38682..1f346c8f72d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -41,6 +42,7 @@ public abstract class EnrichedEvent implements Event {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(EnrichedEvent.class);
 
   protected final AtomicInteger referenceCount;
+  protected final AtomicBoolean isReleased;
 
   protected final String pipeName;
   protected final PipeTaskMeta pipeTaskMeta;
@@ -66,6 +68,7 @@ public abstract class EnrichedEvent implements Event {
       long startTime,
       long endTime) {
     referenceCount = new AtomicInteger(0);
+    isReleased = new AtomicBoolean(false);
     this.pipeName = pipeName;
     this.pipeTaskMeta = pipeTaskMeta;
     this.pipePattern = pipePattern;
@@ -87,6 +90,13 @@ public abstract class EnrichedEvent implements Event {
   public boolean increaseReferenceCount(String holderMessage) {
     boolean isSuccessful = true;
     synchronized (this) {
+      if (isReleased.get()) {
+        LOGGER.warn(
+            "re-increase reference count to event that has already been 
released: {}, stack trace: {}",
+            coreReportMessage(),
+            Thread.currentThread().getStackTrace());
+        return false;
+      }
       if (referenceCount.get() == 0) {
         isSuccessful = internallyIncreaseResourceReferenceCount(holderMessage);
       }
@@ -127,8 +137,15 @@ public abstract class EnrichedEvent implements Event {
         PipeEventCommitManager.getInstance().commit(this, committerKey);
       }
       final int newReferenceCount = referenceCount.decrementAndGet();
+      if (newReferenceCount == 0) {
+        isReleased.set(true);
+      }
       if (newReferenceCount < 0) {
-        LOGGER.warn("reference count is decreased to {}.", newReferenceCount);
+        LOGGER.warn(
+            "reference count is decreased to {}, event: {}, stack trace: {}",
+            newReferenceCount,
+            coreReportMessage(),
+            Thread.currentThread().getStackTrace());
       }
     }
     return isSuccessful;
@@ -150,6 +167,7 @@ public abstract class EnrichedEvent implements Event {
         isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
       }
       referenceCount.set(0);
+      isReleased.set(true);
     }
     return isSuccessful;
   }
@@ -289,6 +307,8 @@ public abstract class EnrichedEvent implements Event {
     return "EnrichedEvent{"
         + "referenceCount="
         + referenceCount.get()
+        + ", isReleased="
+        + isReleased.get()
         + ", pipeName='"
         + pipeName
         + "', pipeTaskMeta="
@@ -316,6 +336,8 @@ public abstract class EnrichedEvent implements Event {
     return "EnrichedEvent{"
         + "referenceCount="
         + referenceCount.get()
+        + ", isReleased="
+        + isReleased.get()
         + ", pipeName='"
         + pipeName
         + "', committerKey='"
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
index 89eb7b2868c..bb985248854 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
@@ -35,25 +35,25 @@ public class PipeEventCommitManager {
   // key: pipeName_dataRegionId
   private final Map<String, PipeEventCommitter> eventCommitterMap = new 
ConcurrentHashMap<>();
 
-  public void register(String pipeName, int regionId, String pipePluginName) {
+  public void register(String pipeName, long creationTime, int regionId, 
String pipePluginName) {
     if (pipeName == null || pipePluginName == null) {
       return;
     }
 
-    final String committerKey = generateCommitterKey(pipeName, regionId);
+    final String committerKey = generateCommitterKey(pipeName, creationTime, 
regionId);
     if (eventCommitterMap.containsKey(committerKey)) {
       LOGGER.warn(
           "Pipe with same name is already registered on this data region, 
overwriting: {}",
           committerKey);
     }
-    PipeEventCommitter eventCommitter = new PipeEventCommitter(pipeName, 
regionId);
+    PipeEventCommitter eventCommitter = new PipeEventCommitter(pipeName, 
creationTime, regionId);
     eventCommitterMap.put(committerKey, eventCommitter);
     PipeEventCommitMetrics.getInstance().register(eventCommitter, 
committerKey);
     LOGGER.info("Pipe committer registered for pipe on data region: {}", 
committerKey);
   }
 
-  public void deregister(String pipeName, int regionId) {
-    final String committerKey = generateCommitterKey(pipeName, regionId);
+  public void deregister(String pipeName, long creationTime, int regionId) {
+    final String committerKey = generateCommitterKey(pipeName, creationTime, 
regionId);
     eventCommitterMap.remove(committerKey);
     PipeEventCommitMetrics.getInstance().deregister(committerKey);
     LOGGER.info("Pipe committer deregistered for pipe on data region: {}", 
committerKey);
@@ -63,12 +63,13 @@ public class PipeEventCommitManager {
    * Assign a commit id and a key for commit. Make sure {@code 
EnrichedEvent.pipeName} is set before
    * calling this.
    */
-  public void enrichWithCommitterKeyAndCommitId(EnrichedEvent event, int 
regionId) {
+  public void enrichWithCommitterKeyAndCommitId(
+      EnrichedEvent event, long creationTime, int regionId) {
     if (event == null || event.getPipeName() == null || !event.needToCommit()) 
{
       return;
     }
 
-    final String committerKey = generateCommitterKey(event.getPipeName(), 
regionId);
+    final String committerKey = generateCommitterKey(event.getPipeName(), 
creationTime, regionId);
     final PipeEventCommitter committer = eventCommitterMap.get(committerKey);
     if (committer == null) {
       return;
@@ -86,13 +87,20 @@ public class PipeEventCommitManager {
 
     final PipeEventCommitter committer = eventCommitterMap.get(committerKey);
     if (committer == null) {
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "missing PipeEventCommitter({}) when commit event: {}, stack 
trace: {}",
+            committerKey,
+            event.coreReportMessage(),
+            Thread.currentThread().getStackTrace());
+      }
       return;
     }
     committer.commit(event);
   }
 
-  private static String generateCommitterKey(String pipeName, int regionId) {
-    return String.format("%s_%s", pipeName, regionId);
+  private static String generateCommitterKey(String pipeName, long 
creationTime, int regionId) {
+    return String.format("%s_%s_%s", pipeName, regionId, creationTime);
   }
 
   private PipeEventCommitManager() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
index 04664baf1ea..eeec9c8be25 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
@@ -36,6 +36,7 @@ public class PipeEventCommitter {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeEventCommitter.class);
 
   private final String pipeName;
+  private final long creationTime;
   private final int dataRegionId;
 
   private final AtomicLong commitIdGenerator = new AtomicLong(0);
@@ -48,9 +49,10 @@ public class PipeEventCommitter {
               event ->
                   Objects.requireNonNull(event, "committable event cannot be 
null").getCommitId()));
 
-  PipeEventCommitter(String pipeName, int dataRegionId) {
+  PipeEventCommitter(String pipeName, long creationTime, int dataRegionId) {
     // make it package-private
     this.pipeName = pipeName;
+    this.creationTime = creationTime;
     this.dataRegionId = dataRegionId;
   }
 
@@ -66,9 +68,11 @@ public class PipeEventCommitter {
 
       if (e.getCommitId() <= lastCommitId.get()) {
         LOGGER.warn(
-            "commit id must be monotonically increasing, lastCommitId: {}, 
event: {}",
+            "commit id must be monotonically increasing, current commit id: 
{}, last commit id: {}, event: {}, stack trace: {}",
+            e.getCommitId(),
             lastCommitId.get(),
-            e);
+            e.coreReportMessage(),
+            Thread.currentThread().getStackTrace());
         commitQueue.poll();
         continue;
       }
@@ -89,6 +93,11 @@ public class PipeEventCommitter {
     return pipeName;
   }
 
+  // TODO: waiting called by PipeEventCommitMetrics
+  public long getCreationTime() {
+    return creationTime;
+  }
+
   public int getDataRegionId() {
     return dataRegionId;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
index 32ee1b292f0..4d0bae8e0e9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
@@ -90,7 +90,7 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
 
     if (isClosed.get()) {
       LOGGER.info("onFailure in pipe transfer, ignored because pipe is 
dropped.", throwable);
-      releaseLastEvent(false);
+      clearReferenceCountAndReleaseLastEvent();
       return;
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
index b04478b5834..63d807ba347 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
@@ -39,7 +39,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
   public synchronized void onFailure(Throwable throwable) {
     if (isClosed.get()) {
       LOGGER.info("onFailure in pipe subtask, ignored because pipe is 
dropped.", throwable);
-      releaseLastEvent(false);
+      clearReferenceCountAndReleaseLastEvent();
       return;
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
index baa018059a4..a3834e94dd0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
@@ -90,7 +90,7 @@ public abstract class PipeSubtask
     return hasAtLeastOneEventProcessed;
   }
 
-  /** Should be synchronized with {@link PipeSubtask#releaseLastEvent} */
+  /** Should be synchronized with {@link 
PipeSubtask#decreaseReferenceCountAndReleaseLastEvent} */
   protected synchronized void setLastEvent(Event event) {
     lastEvent = event;
   }
@@ -146,15 +146,12 @@ public abstract class PipeSubtask
     return !shouldStopSubmittingSelf.get();
   }
 
-  // synchronized for close() and releaseLastEvent(). make sure that the 
lastEvent
-  // will not be updated after pipeProcessor.close() to avoid resource leak
-  // because of the lastEvent is not released.
   @Override
   public void close() {
-    releaseLastEvent(false);
+    clearReferenceCountAndReleaseLastEvent();
   }
 
-  protected synchronized void releaseLastEvent(boolean shouldReport) {
+  protected synchronized void 
decreaseReferenceCountAndReleaseLastEvent(boolean shouldReport) {
     if (lastEvent != null) {
       if (lastEvent instanceof EnrichedEvent) {
         ((EnrichedEvent) 
lastEvent).decreaseReferenceCount(this.getClass().getName(), shouldReport);
@@ -163,6 +160,15 @@ public abstract class PipeSubtask
     }
   }
 
+  protected synchronized void clearReferenceCountAndReleaseLastEvent() {
+    if (lastEvent != null) {
+      if (lastEvent instanceof EnrichedEvent) {
+        ((EnrichedEvent) 
lastEvent).clearReferenceCount(this.getClass().getName());
+      }
+      lastEvent = null;
+    }
+  }
+
   public String getTaskID() {
     return taskID;
   }


Reply via email to