This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new b89bf1f0166 Pipe: fix the reference count leak of events published in 
the assigner during restart (#14308) (#14342)
b89bf1f0166 is described below

commit b89bf1f0166539369b5b5a7f229f7ac6cb6dc92b
Author: VGalaxies <[email protected]>
AuthorDate: Fri Dec 6 14:53:35 2024 +0800

    Pipe: fix the reference count leak of events published in the assigner 
during restart (#14308) (#14342)
---
 .../realtime/assigner/DisruptorQueue.java          | 45 ++++++++---------
 .../realtime/assigner/PipeDataRegionAssigner.java  | 59 ++++++++++++++++------
 2 files changed, 64 insertions(+), 40 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
index 0a576142a39..6ea51fbe4b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/DisruptorQueue.java
@@ -22,10 +22,8 @@ package 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
 import org.apache.iotdb.commons.concurrent.IoTDBDaemonThreadFactory;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
-import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 
@@ -35,6 +33,8 @@ import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
 
+import java.util.function.Consumer;
+
 import static 
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISRUPTOR;
 
 public class DisruptorQueue {
@@ -46,9 +46,11 @@ public class DisruptorQueue {
   private final Disruptor<EventContainer> disruptor;
   private final RingBuffer<EventContainer> ringBuffer;
 
-  private final PipeEventCounter eventCounter = new 
PipeDataRegionEventCounter();
+  private volatile boolean isClosed = false;
 
-  public DisruptorQueue(final EventHandler<PipeRealtimeEvent> eventHandler) {
+  public DisruptorQueue(
+      final EventHandler<PipeRealtimeEvent> eventHandler,
+      final Consumer<PipeRealtimeEvent> onAssignedHook) {
     final PipeConfig config = PipeConfig.getInstance();
     final int ringBufferSize = 
config.getPipeExtractorAssignerDisruptorRingBufferSize();
     final long ringBufferEntrySizeInBytes =
@@ -71,9 +73,9 @@ public class DisruptorQueue {
             new BlockingWaitStrategy());
     disruptor.handleEventsWith(
         (container, sequence, endOfBatch) -> {
-          eventHandler.onEvent(container.getEvent(), sequence, endOfBatch);
-          EnrichedEvent innerEvent = container.getEvent().getEvent();
-          eventCounter.decreaseEventCount(innerEvent);
+          final PipeRealtimeEvent realtimeEvent = container.getEvent();
+          eventHandler.onEvent(realtimeEvent, sequence, endOfBatch);
+          onAssignedHook.accept(realtimeEvent);
         });
     disruptor.setDefaultExceptionHandler(new DisruptorQueueExceptionHandler());
 
@@ -81,19 +83,24 @@ public class DisruptorQueue {
   }
 
   public void publish(final PipeRealtimeEvent event) {
-    final EnrichedEvent internalEvent = event.getEvent();
-    if (internalEvent instanceof PipeHeartbeatEvent) {
-      ((PipeHeartbeatEvent) internalEvent).recordDisruptorSize(ringBuffer);
+    final EnrichedEvent innerEvent = event.getEvent();
+    if (innerEvent instanceof PipeHeartbeatEvent) {
+      ((PipeHeartbeatEvent) innerEvent).recordDisruptorSize(ringBuffer);
     }
     ringBuffer.publishEvent((container, sequence, o) -> 
container.setEvent(event), event);
-    eventCounter.increaseEventCount(internalEvent);
   }
 
-  public void clear() {
-    disruptor.halt();
+  public void shutdown() {
+    isClosed = true;
+    // use shutdown instead of halt to ensure all published events have been 
handled
+    disruptor.shutdown();
     allocatedMemoryBlock.close();
   }
 
+  public boolean isClosed() {
+    return isClosed;
+  }
+
   private static class EventContainer {
 
     private PipeRealtimeEvent event;
@@ -108,16 +115,4 @@ public class DisruptorQueue {
       this.event = event;
     }
   }
-
-  public int getTabletInsertionEventCount() {
-    return eventCounter.getTabletInsertionEventCount();
-  }
-
-  public int getTsFileInsertionEventCount() {
-    return eventCounter.getTsFileInsertionEventCount();
-  }
-
-  public int getPipeHeartbeatEventCount() {
-    return eventCounter.getPipeHeartbeatEventCount();
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index d3a7f44e458..c5f08634df4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
+import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
@@ -32,6 +33,7 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.CachedSchemaPatternMatcher;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.PipeDataRegionMatcher;
 import org.apache.iotdb.db.pipe.metric.PipeAssignerMetrics;
+import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import org.slf4j.Logger;
@@ -63,13 +65,15 @@ public class PipeDataRegionAssigner implements Closeable {
   private final AtomicReference<ProgressIndex> 
maxProgressIndexForTsFileInsertionEvent =
       new AtomicReference<>(MinimumProgressIndex.INSTANCE);
 
+  private final PipeEventCounter eventCounter = new 
PipeDataRegionEventCounter();
+
   public String getDataRegionId() {
     return dataRegionId;
   }
 
   public PipeDataRegionAssigner(final String dataRegionId) {
     this.matcher = new CachedSchemaPatternMatcher();
-    this.disruptor = new DisruptorQueue(this::assignToExtractor);
+    this.disruptor = new DisruptorQueue(this::assignToExtractor, 
this::onAssignedHook);
     this.dataRegionId = dataRegionId;
     PipeAssignerMetrics.getInstance().register(this);
   }
@@ -81,19 +85,44 @@ public class PipeDataRegionAssigner implements Closeable {
       return;
     }
 
-    disruptor.publish(event);
+    final EnrichedEvent innerEvent = event.getEvent();
+    eventCounter.increaseEventCount(innerEvent);
+    if (innerEvent instanceof PipeHeartbeatEvent) {
+      ((PipeHeartbeatEvent) innerEvent).onPublished();
+    }
+
+    if (!disruptor.isClosed()) {
+      disruptor.publish(event);
+    } else {
+      onAssignedHook(event);
+    }
+  }
+
+  private void onAssignedHook(final PipeRealtimeEvent realtimeEvent) {
+    realtimeEvent.gcSchemaInfo();
+    
realtimeEvent.decreaseReferenceCount(PipeDataRegionAssigner.class.getName(), 
false);
 
-    if (event.getEvent() instanceof PipeHeartbeatEvent) {
-      ((PipeHeartbeatEvent) event.getEvent()).onPublished();
+    final EnrichedEvent innerEvent = realtimeEvent.getEvent();
+    eventCounter.decreaseEventCount(innerEvent);
+    if (innerEvent instanceof PipeHeartbeatEvent) {
+      ((PipeHeartbeatEvent) innerEvent).onAssigned();
     }
   }
 
-  public void assignToExtractor(
+  private void assignToExtractor(
       final PipeRealtimeEvent event, final long sequence, final boolean 
endOfBatch) {
+    if (disruptor.isClosed()) {
+      return;
+    }
+
     matcher
         .match(event)
         .forEach(
             extractor -> {
+              if (disruptor.isClosed()) {
+                return;
+              }
+
               if (event.getEvent().isGeneratedByPipe() && 
!extractor.isForwardingPipeRequests()) {
                 // The frequency of progress reports is limited by the 
counter, while progress
                 // reports to TsFileInsertionEvent are not limited.
@@ -148,13 +177,7 @@ public class PipeDataRegionAssigner implements Closeable {
                 return;
               }
               extractor.extract(copiedEvent);
-
-              if (innerEvent instanceof PipeHeartbeatEvent) {
-                ((PipeHeartbeatEvent) innerEvent).onAssigned();
-              }
             });
-    event.gcSchemaInfo();
-    event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName(), 
false);
   }
 
   private void bindOrUpdateProgressIndexForTsFileInsertionEvent(
@@ -196,18 +219,24 @@ public class PipeDataRegionAssigner implements Closeable {
   public void close() {
     PipeAssignerMetrics.getInstance().deregister(dataRegionId);
     matcher.clear();
-    disruptor.clear();
+
+    final long startTime = System.currentTimeMillis();
+    disruptor.shutdown();
+    LOGGER.info(
+        "Pipe: Assigner on data region {} shutdown internal disruptor within 
{} ms",
+        dataRegionId,
+        System.currentTimeMillis() - startTime);
   }
 
   public int getTabletInsertionEventCount() {
-    return disruptor.getTabletInsertionEventCount();
+    return eventCounter.getTabletInsertionEventCount();
   }
 
   public int getTsFileInsertionEventCount() {
-    return disruptor.getTsFileInsertionEventCount();
+    return eventCounter.getTsFileInsertionEventCount();
   }
 
   public int getPipeHeartbeatEventCount() {
-    return disruptor.getPipeHeartbeatEventCount();
+    return eventCounter.getPipeHeartbeatEventCount();
   }
 }

Reply via email to