This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3250550f9a6 [IOTDB-6154] Pipe: better algorithm for hybrid mode to 
switch log/tsfile extraction in iotdb-extractor (#11142)
3250550f9a6 is described below

commit 3250550f9a6176859bde1c4dc34020825efbe577
Author: Itami Sho <[email protected]>
AuthorDate: Mon Sep 25 11:00:33 2023 +0800

    [IOTDB-6154] Pipe: better algorithm for hybrid mode to switch log/tsfile 
extraction in iotdb-extractor (#11142)
    
    In the following 3 cases, we should not extract any more tablet events. all 
the data represented by the tablet events should be carried by the following 
tsfile event:
    
    1. The historical extractor has not consumed all the data.
    
    2. HybridExtractor will first try to do extraction in log mode, and then 
choose log or tsfile mode to continue extracting, but if all wal size > maximum 
size of wal allowed, the write operation will be throttled, so we should not 
extract any more tablet events.
    
    3. The number of tsfile events in the pending queue has exceeded the limit.
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../apache/iotdb/confignode/manager/IManager.java  |  2 +-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |  4 ++
 .../PipeRealtimeDataRegionHybridExtractor.java     | 35 ++++++++++---
 .../apache/iotdb/db/pipe/task/PipeTaskManager.java | 30 ++++++++++-
 .../connection/UnboundedBlockingPendingQueue.java  | 60 +++++++++++++++++++++-
 .../apache/iotdb/commons/conf/CommonConfig.java    | 19 ++-----
 .../iotdb/commons/conf/CommonDescriptor.java       | 11 ++--
 .../iotdb/commons/pipe/config/PipeConfig.java      | 11 ++--
 8 files changed, 130 insertions(+), 42 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 97cb2dc0ec5..877ddb0255c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -604,7 +604,7 @@ public interface IManager {
   TGetAllPipeInfoResp getAllPipeInfo();
 
   /**
-   * Get RegionId。used for Show cluster slots information in
+   * Get RegionId. used for Show cluster slots information in
    * docs/zh/UserGuide/Cluster/Cluster-Maintenance.md.
    *
    * @return TGetRegionIdResp.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 34e99e09fb5..a0cc1350949 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -117,6 +117,10 @@ public class PipeTaskAgent {
 
   ////////////////////////// Pipe Task Management Entry 
//////////////////////////
 
+  public int getLeaderDataRegionCount() {
+    return pipeTaskManager.getLeaderDataRegionCount();
+  }
+
   public synchronized TPushPipeMetaRespExceptionMessage 
handleSinglePipeMetaChanges(
       PipeMeta pipeMetaFromConfigNode) {
     acquireWriteLock();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index c39f58a8874..269e9ae1d6c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.pipe.extractor.realtime;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
@@ -37,6 +39,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
 
+  private volatile boolean isStartedToSupply = false;
+
   @Override
   protected void doExtract(PipeRealtimeEvent event) {
     final Event eventToExtract = event.getEvent();
@@ -66,10 +70,17 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   }
 
   private void extractTabletInsertion(PipeRealtimeEvent event) {
-    if (isApproachingCapacity()) {
-      // if the pending queue is approaching capacity, we should not extract 
any more tablet events.
-      // all the data represented by the tablet events should be carried by 
the following tsfile
-      // event.
+    if (!isStartedToSupply
+        || mayWalSizeReachThrottleThreshold()
+        || isTsFileEventCountInQueueExceededLimit()) {
+      // In the following 3 cases, we should not extract any more tablet 
events. all the data
+      // represented by the tablet events should be carried by the following 
tsfile event:
+      //  1. The historical extractor has not consumed all the data.
+      //  2. HybridExtractor will first try to do extraction in log mode, and 
then choose log or
+      //  tsfile mode to continue extracting, but if (leader data regions num 
* Wal size) > (maximum
+      //  size of wal buffer), the write operation will be throttled, so we 
should not extract any
+      //  more tablet events.
+      //  3. The number of tsfile events in the pending queue has exceeded the 
limit.
       event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
     }
 
@@ -183,13 +194,23 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
     }
   }
 
-  private boolean isApproachingCapacity() {
-    return pendingQueue.size()
-        >= PipeConfig.getInstance().getPipeExtractorPendingQueueTabletLimit();
+  private boolean mayWalSizeReachThrottleThreshold() {
+    final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    // Assume that the max data replica factor in common config is 3.
+    // This can be changed in the future.
+    return 3L * PipeAgent.task().getLeaderDataRegionCount() * 
config.getWalBufferSize()
+        > config.getThrottleThreshold();
+  }
+
+  private boolean isTsFileEventCountInQueueExceededLimit() {
+    return pendingQueue.getTsfileInsertionEventCount()
+        >= PipeConfig.getInstance().getPipeExtractorPendingQueueTsFileLimit();
   }
 
   @Override
   public Event supply() {
+    isStartedToSupply = true;
+
     PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) 
pendingQueue.directPoll();
 
     while (realtimeEvent != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskManager.java
index 016b2537f9a..6b91ae46efe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskManager.java
@@ -29,16 +29,33 @@ public class PipeTaskManager {
 
   private final Map<PipeStaticMeta, Map<TConsensusGroupId, PipeTask>> pipeMap 
= new HashMap<>();
 
+  /**
+   * Leader data region count in this data node. We simply update it when 
adding pipe task but not
+   * remove it when removing pipe task. So it may be larger than the actual 
leader data region count
+   * in this data node.
+   */
+  private volatile int leaderDataRegionCount = 0;
+
   /** Add pipe task by pipe static meta and consensus group id. */
   public synchronized void addPipeTask(
       PipeStaticMeta pipeStaticMeta, TConsensusGroupId consensusGroupId, 
PipeTask pipeTask) {
-    pipeMap.computeIfAbsent(pipeStaticMeta, k -> new 
HashMap<>()).put(consensusGroupId, pipeTask);
+    final Map<TConsensusGroupId, PipeTask> dataRegionId2PipeTask =
+        pipeMap.computeIfAbsent(pipeStaticMeta, k -> new HashMap<>());
+    dataRegionId2PipeTask.put(consensusGroupId, pipeTask);
+
+    // update leader data region count
+    leaderDataRegionCount = Math.max(leaderDataRegionCount, 
dataRegionId2PipeTask.size());
   }
 
   /** Add pipe tasks by pipe static meta. */
   public synchronized void addPipeTasks(
       PipeStaticMeta pipeStaticMeta, Map<TConsensusGroupId, PipeTask> 
pipeTasks) {
-    pipeMap.computeIfAbsent(pipeStaticMeta, k -> new 
HashMap<>()).putAll(pipeTasks);
+    final Map<TConsensusGroupId, PipeTask> dataRegionId2PipeTask =
+        pipeMap.computeIfAbsent(pipeStaticMeta, k -> new HashMap<>());
+    dataRegionId2PipeTask.putAll(pipeTasks);
+
+    // update leader data region count
+    leaderDataRegionCount = Math.max(leaderDataRegionCount, 
dataRegionId2PipeTask.size());
   }
 
   /**
@@ -93,4 +110,13 @@ public class PipeTaskManager {
   public synchronized Map<TConsensusGroupId, PipeTask> 
getPipeTasks(PipeStaticMeta pipeStaticMeta) {
     return pipeMap.get(pipeStaticMeta);
   }
+
+  /**
+   * Get leader data region count in this data node.
+   *
+   * @return leader data region count
+   */
+  public int getLeaderDataRegionCount() {
+    return leaderDataRegionCount;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
index b3bdd7b9324..7a1dce27548 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
@@ -20,24 +20,80 @@
 package org.apache.iotdb.db.pipe.task.connection;
 
 import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class UnboundedBlockingPendingQueue<E extends Event> extends 
BlockingPendingQueue<E> {
 
   private final BlockingDeque<E> pendingDeque;
 
+  private final AtomicInteger tsfileInsertionEventCount;
+
   public UnboundedBlockingPendingQueue() {
     super(new LinkedBlockingDeque<>());
     pendingDeque = (BlockingDeque<E>) pendingQueue;
+    tsfileInsertionEventCount = new AtomicInteger(0);
+  }
+
+  @Override
+  public boolean waitedOffer(E event) {
+    final boolean offered = super.waitedOffer(event);
+    if (offered && event instanceof TsFileInsertionEvent) {
+      tsfileInsertionEventCount.incrementAndGet();
+    }
+    return offered;
+  }
+
+  @Override
+  public boolean directOffer(E event) {
+    final boolean offered = super.directOffer(event);
+    if (offered && event instanceof TsFileInsertionEvent) {
+      tsfileInsertionEventCount.incrementAndGet();
+    }
+    return offered;
+  }
+
+  @Override
+  public boolean put(E event) {
+    final boolean putSuccessfully = super.put(event);
+    if (putSuccessfully && event instanceof TsFileInsertionEvent) {
+      tsfileInsertionEventCount.incrementAndGet();
+    }
+    return putSuccessfully;
+  }
+
+  @Override
+  public E directPoll() {
+    final E event = super.directPoll();
+    if (event instanceof TsFileInsertionEvent) {
+      tsfileInsertionEventCount.decrementAndGet();
+    }
+    return event;
+  }
+
+  @Override
+  public E waitedPoll() {
+    final E event = super.waitedPoll();
+    if (event instanceof TsFileInsertionEvent) {
+      tsfileInsertionEventCount.decrementAndGet();
+    }
+    return event;
+  }
+
+  @Override
+  public void clear() {
+    super.clear();
+    tsfileInsertionEventCount.set(0);
   }
 
   public E peekLast() {
     return pendingDeque.peekLast();
   }
 
-  public E removeLast() {
-    return pendingDeque.removeLast();
+  public int getTsfileInsertionEventCount() {
+    return tsfileInsertionEventCount.get();
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 8f5eab4f5d3..c96a70d07df 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -164,8 +164,7 @@ public class CommonConfig {
 
   private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
   private int pipeExtractorMatcherCacheSize = 1024;
-  private int pipeExtractorPendingQueueCapacity = 256;
-  private int pipeExtractorPendingQueueTabletLimit = 
pipeExtractorPendingQueueCapacity / 2;
+  private int pipeExtractorPendingQueueTsFileLimit = 3;
   private int pipeDataStructureTabletRowSize = 2048;
 
   private long pipeConnectorTimeoutMs = 15 * 60 * 1000L; // 15 minutes
@@ -546,20 +545,12 @@ public class CommonConfig {
     this.pipeExtractorMatcherCacheSize = pipeExtractorMatcherCacheSize;
   }
 
-  public int getPipeExtractorPendingQueueCapacity() {
-    return pipeExtractorPendingQueueCapacity;
+  public int getPipeExtractorPendingQueueTsFileLimit() {
+    return pipeExtractorPendingQueueTsFileLimit;
   }
 
-  public void setPipeExtractorPendingQueueCapacity(int 
pipeExtractorPendingQueueCapacity) {
-    this.pipeExtractorPendingQueueCapacity = pipeExtractorPendingQueueCapacity;
-  }
-
-  public int getPipeExtractorPendingQueueTabletLimit() {
-    return pipeExtractorPendingQueueTabletLimit;
-  }
-
-  public void setPipeExtractorPendingQueueTabletLimit(int 
pipeExtractorPendingQueueTabletLimit) {
-    this.pipeExtractorPendingQueueTabletLimit = 
pipeExtractorPendingQueueTabletLimit;
+  public void setPipeExtractorPendingQueueTsFileLimit(int 
pipeExtractorPendingQueueTsfileLimit) {
+    this.pipeExtractorPendingQueueTsFileLimit = 
pipeExtractorPendingQueueTsfileLimit;
   }
 
   public long getPipeConnectorTimeoutMs() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 69d20269788..ab4be8eacfc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -311,16 +311,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_extractor_matcher_cache_size",
                 String.valueOf(config.getPipeExtractorMatcherCacheSize()))));
-    config.setPipeExtractorPendingQueueCapacity(
+    config.setPipeExtractorPendingQueueTsFileLimit(
         Integer.parseInt(
             properties.getProperty(
-                "pipe_extractor_pending_queue_capacity",
-                
String.valueOf(config.getPipeExtractorPendingQueueCapacity()))));
-    config.setPipeExtractorPendingQueueTabletLimit(
-        Integer.parseInt(
-            properties.getProperty(
-                "pipe_extractor_pending_queue_tablet_limit",
-                
String.valueOf(config.getPipeExtractorPendingQueueTabletLimit()))));
+                "pipe_extractor_pending_queue_tsfile_limit",
+                
String.valueOf(config.getPipeExtractorPendingQueueTsFileLimit()))));
 
     config.setPipeConnectorTimeoutMs(
         Long.parseLong(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 4b46ac5e0fa..f19f948d6cc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -81,12 +81,8 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeExtractorMatcherCacheSize();
   }
 
-  public int getPipeExtractorPendingQueueCapacity() {
-    return COMMON_CONFIG.getPipeExtractorPendingQueueCapacity();
-  }
-
-  public int getPipeExtractorPendingQueueTabletLimit() {
-    return COMMON_CONFIG.getPipeExtractorPendingQueueTabletLimit();
+  public int getPipeExtractorPendingQueueTsFileLimit() {
+    return COMMON_CONFIG.getPipeExtractorPendingQueueTsFileLimit();
   }
 
   /////////////////////////////// Connector ///////////////////////////////
@@ -186,9 +182,8 @@ public class PipeConfig {
         "PipeExtractorAssignerDisruptorRingBufferSize: {}",
         getPipeExtractorAssignerDisruptorRingBufferSize());
     LOGGER.info("PipeExtractorMatcherCacheSize: {}", 
getPipeExtractorMatcherCacheSize());
-    LOGGER.info("PipeExtractorPendingQueueCapacity: {}", 
getPipeExtractorPendingQueueCapacity());
     LOGGER.info(
-        "PipeExtractorPendingQueueTabletLimit: {}", 
getPipeExtractorPendingQueueTabletLimit());
+        "PipeExtractorPendingQueueTsFileLimit: {}", 
getPipeExtractorPendingQueueTsFileLimit());
 
     LOGGER.info("PipeConnectorTimeoutMs: {}", getPipeConnectorTimeoutMs());
     LOGGER.info("PipeConnectorReadFileBufferSize: {}", 
getPipeConnectorReadFileBufferSize());

Reply via email to