This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-pipe-ref-issue
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e03654a418ae8bba0782ab1be4a18de393a77d82
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Aug 5 04:55:24 2023 +0800

    1. 极端场景久不久丢一些数据的一个原因:Disruptor 的 Producer Type 选错了,我们是多生产者的(log 一个,tsfile 
一个,两个会并发,ring buffer 
会覆盖)https://stackoverflow.com/questions/18997398/lmax-disruptor-is-too-slow-in-multi-producer-mode-compared-to-single-producer-mo
 [捂脸] 2. Hybrid Mode 确实会导致 wal 被长期 pin 住,因为有些被 ignore 的 event,在退出条件的 case 
下引用计数没有正确归零 [捂脸]
---
 .../PipeRealtimeDataRegionHybridExtractor.java     | 97 +++++++++++++---------
 .../PipeRealtimeDataRegionLogExtractor.java        |  3 +-
 .../PipeRealtimeDataRegionTsFileExtractor.java     |  3 +-
 .../realtime/assigner/DisruptorQueue.java          | 84 +++++++------------
 .../realtime/assigner/PipeDataRegionAssigner.java  | 10 +--
 .../db/pipe/resource/wal/PipeWALResource.java      |  4 +
 .../pipe/resource/wal/PipeWALResourceManager.java  | 12 ++-
 7 files changed, 111 insertions(+), 102 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 026a3334d54..9b8ff792be4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -77,31 +77,37 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
       // all the data represented by the tablet events should be carried by 
the following tsfile
       // event.
       event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
-      LOGGER.info(
-          "extractTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor "
-              + "is approaching capacity, discard tablet event {}, change 
state of tsfile epoch to {}",
-          event,
-          event.getTsFileEpoch().getState(this));
-
-      // Ignore the tablet event.
-      
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
-      return;
     }
 
-    if 
(!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)
-        && !pendingQueue.waitedOffer(event)) {
-      // this would not happen, but just in case.
-      // pendingQueue is unbounded, so it should never reach capacity.
-      final String errorMessage =
-          String.format(
-              "extractTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor %s "
-                  + "has reached capacity, discard tablet event %s, current 
state %s",
-              this, event, event.getTsFileEpoch().getState(this));
-      LOGGER.error(errorMessage);
-      PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-      // Ignore the tablet event.
-      
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+    final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
+    switch (state) {
+      case USING_TSFILE:
+        // Ignore the tablet event.
+        
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+        break;
+      case EMPTY:
+      case USING_TABLET:
+        if (!pendingQueue.waitedOffer(event)) {
+          // this would not happen, but just in case.
+          // pendingQueue is unbounded, so it should never reach capacity.
+          final String errorMessage =
+              String.format(
+                  "extractTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor %s "
+                      + "has reached capacity, discard tablet event %s, 
current state %s",
+                  this, event, event.getTsFileEpoch().getState(this));
+          LOGGER.error(errorMessage);
+          PipeAgent.runtime()
+              .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
+
+          // Ignore the tablet event.
+          
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            String.format(
+                "Unsupported state %s for hybrid realtime extractor %s",
+                state, PipeRealtimeDataRegionHybridExtractor.class.getName()));
     }
   }
 
@@ -113,19 +119,35 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
             state ->
                 state.equals(TsFileEpoch.State.EMPTY) ? 
TsFileEpoch.State.USING_TSFILE : state);
 
-    if (!pendingQueue.waitedOffer(event)) {
-      // this would not happen, but just in case.
-      // pendingQueue is unbounded, so it should never reach capacity.
-      final String errorMessage =
-          String.format(
-              "extractTsFileInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor %s "
-                  + "has reached capacity, discard TsFile event %s, current 
state %s",
-              this, event, event.getTsFileEpoch().getState(this));
-      LOGGER.error(errorMessage);
-      PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-      // Ignore the tsfile event.
-      
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+    final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
+    switch (state) {
+      case EMPTY:
+      case USING_TSFILE:
+        if (!pendingQueue.waitedOffer(event)) {
+          // this would not happen, but just in case.
+          // pendingQueue is unbounded, so it should never reach capacity.
+          final String errorMessage =
+              String.format(
+                  "extractTsFileInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor %s "
+                      + "has reached capacity, discard TsFile event %s, 
current state %s",
+                  this, event, event.getTsFileEpoch().getState(this));
+          LOGGER.error(errorMessage);
+          PipeAgent.runtime()
+              .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
+
+          // Ignore the tsfile event.
+          
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+        }
+        break;
+      case USING_TABLET:
+        // All the tablet events have been extracted, so we can ignore the 
tsfile event.
+        
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            String.format(
+                "Unsupported state %s for hybrid realtime extractor %s",
+                state, PipeRealtimeDataRegionHybridExtractor.class.getName()));
     }
   }
 
@@ -154,8 +176,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
                 eventToSupply.getClass(), this));
       }
 
+      
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+
       if (suppliedEvent != null) {
-        
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
         return suppliedEvent;
       }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index 17891add76d..156c8891710 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -102,8 +102,9 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
         PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
       }
 
+      
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+
       if (suppliedEvent != null) {
-        
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
         return suppliedEvent;
       }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index 5e97a338df1..cae2e6f3c1b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -102,8 +102,9 @@ public class PipeRealtimeDataRegionTsFileExtractor extends 
PipeRealtimeDataRegio
         PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
       }
 
+      
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
+
       if (suppliedEvent != null) {
-        
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
         return suppliedEvent;
       }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
index 9c6d97901aa..bb5ecf89b6b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
@@ -20,80 +20,56 @@
 package org.apache.iotdb.db.pipe.extractor.realtime.assigner;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
 import com.lmax.disruptor.util.DaemonThreadFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ThreadFactory;
-
-public class DisruptorQueue<E> {
-
-  private Disruptor<Container<E>> disruptor;
-  private RingBuffer<Container<E>> ringBuffer;
-
-  private DisruptorQueue() {}
+public class DisruptorQueue {
+
+  private final Disruptor<EventContainer> disruptor;
+  private final RingBuffer<EventContainer> ringBuffer;
+
+  public DisruptorQueue(EventHandler<PipeRealtimeEvent> eventHandler) {
+    disruptor =
+        new Disruptor<>(
+            EventContainer::new,
+            
PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize(),
+            DaemonThreadFactory.INSTANCE, // TODO
+            ProducerType.MULTI,
+            new BlockingWaitStrategy());
+    disruptor.handleEventsWith(
+        (container, sequence, endOfBatch) ->
+            eventHandler.onEvent(container.getEvent(), sequence, endOfBatch));
+    disruptor.setDefaultExceptionHandler(new DisruptorQueueExceptionHandler());
+
+    ringBuffer = disruptor.start();
+  }
 
-  public void publish(E obj) {
-    ringBuffer.publishEvent((container, sequence, o) -> container.setObj(o), 
obj);
+  public void publish(PipeRealtimeEvent event) {
+    ringBuffer.publishEvent((container, sequence, o) -> 
container.setEvent(event), event);
   }
 
   public void clear() {
     disruptor.halt();
   }
 
-  public static class Builder<E> {
-    private int ringBufferSize =
-        
PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize();
-    private ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;
-    private ProducerType producerType = ProducerType.MULTI;
-    private WaitStrategy waitStrategy = new BlockingWaitStrategy();
-    private final List<EventHandler<E>> handlers = new ArrayList<>();
-
-    public Builder<E> setProducerType(ProducerType producerType) {
-      this.producerType = producerType;
-      return this;
-    }
-
-    public Builder<E> addEventHandler(EventHandler<E> eventHandler) {
-      this.handlers.add(eventHandler);
-      return this;
-    }
-
-    public DisruptorQueue<E> build() {
-      DisruptorQueue<E> disruptorQueue = new DisruptorQueue<>();
-      disruptorQueue.disruptor =
-          new Disruptor<>(
-              Container::new, ringBufferSize, threadFactory, producerType, 
waitStrategy);
-      for (EventHandler<E> handler : handlers) {
-        disruptorQueue.disruptor.handleEventsWith(
-            (container, sequence, endOfBatch) ->
-                handler.onEvent(container.getObj(), sequence, endOfBatch));
-      }
-      disruptorQueue.disruptor.setDefaultExceptionHandler(new 
DisruptorQueueExceptionHandler());
-      disruptorQueue.disruptor.start();
-      disruptorQueue.ringBuffer = disruptorQueue.disruptor.getRingBuffer();
-      return disruptorQueue;
-    }
-  }
+  private static class EventContainer {
 
-  private static class Container<E> {
-    private E obj;
+    private PipeRealtimeEvent event;
 
-    private Container() {}
+    private EventContainer() {}
 
-    public E getObj() {
-      return obj;
+    public PipeRealtimeEvent getEvent() {
+      return event;
     }
 
-    public void setObj(E obj) {
-      this.obj = obj;
+    public void setEvent(PipeRealtimeEvent event) {
+      this.event = event;
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
index d5ca1a10b90..ee569e9e4ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
@@ -24,23 +24,17 @@ import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtract
 import 
org.apache.iotdb.db.pipe.extractor.realtime.matcher.CachedSchemaPatternMatcher;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.matcher.PipeDataRegionMatcher;
 
-import com.lmax.disruptor.dsl.ProducerType;
-
 public class PipeDataRegionAssigner {
 
   /** The matcher is used to match the event with the extractor based on the 
pattern. */
   private final PipeDataRegionMatcher matcher;
 
   /** The disruptor is used to assign the event to the extractor. */
-  private final DisruptorQueue<PipeRealtimeEvent> disruptor;
+  private final DisruptorQueue disruptor;
 
   public PipeDataRegionAssigner() {
     this.matcher = new CachedSchemaPatternMatcher();
-    this.disruptor =
-        new DisruptorQueue.Builder<PipeRealtimeEvent>()
-            .setProducerType(ProducerType.SINGLE)
-            .addEventHandler(this::assignToExtractor)
-            .build();
+    this.disruptor = new DisruptorQueue(this::assignToExtractor);
   }
 
   public void publishToAssign(PipeRealtimeEvent event) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index 085c71d5f9c..358e319dafa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -162,4 +162,8 @@ public abstract class PipeWALResource implements Closeable {
 
     referenceCount.set(0);
   }
+
+  public int getReferenceCount() {
+    return referenceCount.get();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index 64cd83f1a67..afbe6931533 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -24,6 +24,9 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
@@ -34,6 +37,8 @@ import java.util.concurrent.locks.ReentrantLock;
 
 public abstract class PipeWALResourceManager {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeWALResourceManager.class);
+
   protected final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
 
   private static final int SEGMENT_LOCK_COUNT = 32;
@@ -55,7 +60,7 @@ public abstract class PipeWALResourceManager {
     ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
         PIPE_WAL_RESOURCE_TTL_CHECKER,
         () -> {
-          Iterator<Map.Entry<Long, PipeWALResource>> iterator =
+          final Iterator<Map.Entry<Long, PipeWALResource>> iterator =
               memtableIdToPipeWALResourceMap.entrySet().iterator();
           while (iterator.hasNext()) {
             final Map.Entry<Long, PipeWALResource> entry = iterator.next();
@@ -66,6 +71,11 @@ public abstract class PipeWALResourceManager {
             try {
               if (entry.getValue().invalidateIfPossible()) {
                 iterator.remove();
+              } else {
+                LOGGER.info(
+                    "WAL (memtableId {}) is still referenced {} times",
+                    entry.getKey(),
+                    entry.getValue().getReferenceCount());
               }
             } finally {
               lock.unlock();

Reply via email to