This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ee7675ce10 Pipe: make hybrid extractor aware of event collector's 
queue size & block processor subtask's execution when events buffered in 
EventCollector (#11283)
2ee7675ce10 is described below

commit 2ee7675ce101fe4a05aa6770fb34e15cc627653e
Author: Zikun Ma <[email protected]>
AuthorDate: Thu Oct 12 16:36:26 2023 +0800

    Pipe: make hybrid extractor aware of event collector's queue size & block 
processor subtask's execution when events buffered in EventCollector (#11283)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../event/common/heartbeat/PipeHeartbeatEvent.java | 18 ++++++++++++-
 .../PipeRealtimeDataRegionHybridExtractor.java     | 31 ++++++++++++++++------
 .../pipe/resource/wal/PipeWALResourceManager.java  |  4 +++
 .../pipe/task/connection/BlockingPendingQueue.java |  4 +++
 .../pipe/task/connection/PipeEventCollector.java   |  4 +++
 .../subtask/processor/PipeProcessorSubtask.java    | 12 ++++++---
 .../apache/iotdb/commons/conf/CommonConfig.java    | 23 +++++++++-------
 .../iotdb/commons/conf/CommonDescriptor.java       | 11 ++++----
 .../iotdb/commons/pipe/config/PipeConfig.java      | 16 ++++++-----
 9 files changed, 89 insertions(+), 34 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index fd6724088b0..abbfdd8c24d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionHybridExtractor;
 import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.connection.EnrichedDeque;
 import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
@@ -39,6 +41,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
   private final String dataRegionId;
   private String pipeName;
+  private PipeRealtimeDataRegionExtractor extractor = null;
 
   private long timePublished;
   private long timeAssigned;
@@ -164,9 +167,14 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   public void recordBufferQueueSize(EnrichedDeque<Event> bufferQueue) {
     if (shouldPrintMessage) {
       bufferQueueTabletSize = bufferQueue.getTabletInsertionEventCount();
-      bufferQueueTsFileSize = bufferQueue.getTsFileInsertionEventCount();
       bufferQueueSize = bufferQueue.size();
     }
+
+    bufferQueueTsFileSize = bufferQueue.getTsFileInsertionEventCount();
+    if (extractor instanceof PipeRealtimeDataRegionHybridExtractor) {
+      ((PipeRealtimeDataRegionHybridExtractor) extractor)
+          .informEventCollectorQueueTsFileSize(bufferQueueTsFileSize);
+    }
   }
 
   public void recordConnectorQueueSize(BoundedBlockingPendingQueue<Event> 
pendingQueue) {
@@ -177,6 +185,14 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
     }
   }
 
+  /////////////////////////////// For Hybrid extractor 
///////////////////////////////
+
+  public void bindExtractor(PipeRealtimeDataRegionExtractor extractor) {
+    this.extractor = extractor;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
   @Override
   public String toString() {
     final String unknownMessage = "Unknown";
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 1ea661eb90c..a452577858f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -21,12 +21,13 @@ package org.apache.iotdb.db.pipe.extractor.realtime;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -34,12 +35,15 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegionExtractor {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
 
   private volatile boolean isStartedToSupply = false;
+  private final AtomicInteger eventCollectorQueueTsFileSize = new 
AtomicInteger(0);
 
   @Override
   protected void doExtract(PipeRealtimeEvent event) {
@@ -72,6 +76,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   private void extractTabletInsertion(PipeRealtimeEvent event) {
     if (!isStartedToSupply
         || mayWalSizeReachThrottleThreshold()
+        || tooManyWALPinned()
         || isTsFileEventCountInQueueExceededLimit()) {
       // In the following 3 cases, we should not extract any more tablet 
events. all the data
       // represented by the tablet events should be carried by the following 
tsfile event:
@@ -159,6 +164,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   }
 
   private void extractHeartbeat(PipeRealtimeEvent event) {
+    // Bind extractor so that the heartbeat event can later inform the 
extractor of queue size
+    ((PipeHeartbeatEvent) event.getEvent()).bindExtractor(this);
+
     // Record the pending queue size before trying to put heartbeatEvent into 
queue
     ((PipeHeartbeatEvent) 
event.getEvent()).recordExtractorQueueSize(pendingQueue);
 
@@ -192,16 +200,23 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
   }
 
   private boolean mayWalSizeReachThrottleThreshold() {
-    final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    // Assume that the max data replica factor in common config is 3.
-    // This can be changed in the future.
-    return 3L * PipeAgent.task().getLeaderDataRegionCount() * 
config.getWalBufferSize()
-        > config.getThrottleThreshold();
+    return 3 * WALManager.getInstance().getTotalDiskUsage()
+        > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
+  }
+
+  private boolean tooManyWALPinned() {
+    return PipeResourceManager.wal().getApproximatePinnedWALCount()
+        > Math.max(1, PipeAgent.task().getLeaderDataRegionCount())
+            * 
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
   }
 
   private boolean isTsFileEventCountInQueueExceededLimit() {
-    return pendingQueue.getTsFileInsertionEventCount()
-        >= PipeConfig.getInstance().getPipeExtractorPendingQueueTsFileLimit();
+    return pendingQueue.getTsFileInsertionEventCount() + 
eventCollectorQueueTsFileSize.get()
+        >= 
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
+  }
+
+  public void informEventCollectorQueueTsFileSize(int queueSize) {
+    eventCollectorQueueTsFileSize.set(queueSize);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index afbe6931533..f8202372554 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -87,6 +87,10 @@ public abstract class PipeWALResourceManager {
         TimeUnit.MILLISECONDS);
   }
 
+  public int getApproximatePinnedWALCount() {
+    return memtableIdToPipeWALResourceMap.size();
+  }
+
   public final void pin(final WALEntryHandler walEntryHandler) throws 
IOException {
     final long memtableId = walEntryHandler.getMemTableId();
     final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % 
SEGMENT_LOCK_COUNT)];
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
index 035d7ded94d..51d5435ac01 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
@@ -133,6 +133,10 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
     pendingQueue.forEach(action);
   }
 
+  public boolean isEmpty() {
+    return pendingQueue.isEmpty();
+  }
+
   public int size() {
     return pendingQueue.size();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 428a3b38d15..a341f8f9fa0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -73,6 +73,10 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
     }
   }
 
+  public boolean isBufferQueueEmpty() {
+    return bufferQueue.isEmpty();
+  }
+
   /**
    * Try to collect buffered events into pending queue.
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 2814a40b5ae..b5caed5245f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -88,10 +88,14 @@ public class PipeProcessorSubtask extends PipeSubtask {
     final Event event = lastEvent != null ? lastEvent : 
inputEventSupplier.supply();
     // Record the last event for retry when exception occurs
     setLastEvent(event);
-    if (event == null) {
-      // Though there is no event to process, there may still be some buffered 
events
-      // in the outputEventCollector. Return true if there are still buffered 
events,
-      // false otherwise.
+    if (
+    // Though there is no event to process, there may still be some buffered 
events
+    // in the outputEventCollector. Return true if there are still buffered 
events,
+    // false otherwise.
+    event == null
+        // If there are still buffered events, process them first, the newly 
supplied
+        // event will be processed in the next round.
+        || !outputEventCollector.isBufferQueueEmpty()) {
       return outputEventCollector.tryCollectBufferedEvents();
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index c96a70d07df..ccd3e8e8c90 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -158,14 +158,14 @@ public class CommonConfig {
   private int pipeSubtaskExecutorMaxThreadNum =
       Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
 
+  private int pipeDataStructureTabletRowSize = 2048;
+
   private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 
10_000;
   private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 
1000L;
   private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000;
 
   private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
   private int pipeExtractorMatcherCacheSize = 1024;
-  private int pipeExtractorPendingQueueTsFileLimit = 3;
-  private int pipeDataStructureTabletRowSize = 2048;
 
   private long pipeConnectorTimeoutMs = 15 * 60 * 1000L; // 15 minutes
   private int pipeConnectorReadFileBufferSize = 8388608;
@@ -187,6 +187,8 @@ public class CommonConfig {
   private boolean pipeAirGapReceiverEnabled = false;
   private int pipeAirGapReceiverPort = 9780;
 
+  private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 3;
+
   /** Whether to use persistent schema mode. */
   private String schemaEngineMode = "Memory";
 
@@ -545,14 +547,6 @@ public class CommonConfig {
     this.pipeExtractorMatcherCacheSize = pipeExtractorMatcherCacheSize;
   }
 
-  public int getPipeExtractorPendingQueueTsFileLimit() {
-    return pipeExtractorPendingQueueTsFileLimit;
-  }
-
-  public void setPipeExtractorPendingQueueTsFileLimit(int 
pipeExtractorPendingQueueTsfileLimit) {
-    this.pipeExtractorPendingQueueTsFileLimit = 
pipeExtractorPendingQueueTsfileLimit;
-  }
-
   public long getPipeConnectorTimeoutMs() {
     return pipeConnectorTimeoutMs;
   }
@@ -727,6 +721,15 @@ public class CommonConfig {
     return pipeAirGapReceiverPort;
   }
 
+  public int getPipeMaxAllowedPendingTsFileEpochPerDataRegion() {
+    return pipeMaxAllowedPendingTsFileEpochPerDataRegion;
+  }
+
+  public void setPipeMaxAllowedPendingTsFileEpochPerDataRegion(
+      int pipeExtractorPendingQueueTsfileLimit) {
+    this.pipeMaxAllowedPendingTsFileEpochPerDataRegion = 
pipeExtractorPendingQueueTsfileLimit;
+  }
+
   public String getSchemaEngineMode() {
     return schemaEngineMode;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index ab4be8eacfc..7ea9f561d0a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -311,11 +311,6 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_extractor_matcher_cache_size",
                 String.valueOf(config.getPipeExtractorMatcherCacheSize()))));
-    config.setPipeExtractorPendingQueueTsFileLimit(
-        Integer.parseInt(
-            properties.getProperty(
-                "pipe_extractor_pending_queue_tsfile_limit",
-                
String.valueOf(config.getPipeExtractorPendingQueueTsFileLimit()))));
 
     config.setPipeConnectorTimeoutMs(
         Long.parseLong(
@@ -398,6 +393,12 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_air_gap_receiver_port",
                 Integer.toString(config.getPipeAirGapReceiverPort()))));
+
+    config.setPipeMaxAllowedPendingTsFileEpochPerDataRegion(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_max_allowed_pending_tsfile_epoch_per_data_region",
+                
String.valueOf(config.getPipeMaxAllowedPendingTsFileEpochPerDataRegion()))));
   }
 
   public void loadGlobalConfig(TGlobalConfig globalConfig) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index f19f948d6cc..f44dfb60bac 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -81,10 +81,6 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeExtractorMatcherCacheSize();
   }
 
-  public int getPipeExtractorPendingQueueTsFileLimit() {
-    return COMMON_CONFIG.getPipeExtractorPendingQueueTsFileLimit();
-  }
-
   /////////////////////////////// Connector ///////////////////////////////
 
   public long getPipeConnectorTimeoutMs() {
@@ -155,6 +151,12 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeAirGapReceiverPort();
   }
 
+  /////////////////////////////// Hybrid Mode ///////////////////////////////
+
+  public int getPipeMaxAllowedPendingTsFileEpochPerDataRegion() {
+    return COMMON_CONFIG.getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
+  }
+
   /////////////////////////////// Utils ///////////////////////////////
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConfig.class);
@@ -182,8 +184,6 @@ public class PipeConfig {
         "PipeExtractorAssignerDisruptorRingBufferSize: {}",
         getPipeExtractorAssignerDisruptorRingBufferSize());
     LOGGER.info("PipeExtractorMatcherCacheSize: {}", 
getPipeExtractorMatcherCacheSize());
-    LOGGER.info(
-        "PipeExtractorPendingQueueTsFileLimit: {}", 
getPipeExtractorPendingQueueTsFileLimit());
 
     LOGGER.info("PipeConnectorTimeoutMs: {}", getPipeConnectorTimeoutMs());
     LOGGER.info("PipeConnectorReadFileBufferSize: {}", 
getPipeConnectorReadFileBufferSize());
@@ -211,6 +211,10 @@ public class PipeConfig {
 
     LOGGER.info("PipeAirGapReceiverEnabled: {}", 
getPipeAirGapReceiverEnabled());
     LOGGER.info("PipeAirGapReceiverPort: {}", getPipeAirGapReceiverPort());
+
+    LOGGER.info(
+        "PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}",
+        getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
   }
 
   /////////////////////////////// Singleton ///////////////////////////////

Reply via email to