This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b3a0a4ab949 Pipe: Introduce counter to reduce historical data buildup 
in PipeRealtimePriorityBlockingQueue (#12881)
b3a0a4ab949 is described below

commit b3a0a4ab949ba1c7c86de3f1d2f04f9c3661b61f
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Jul 12 19:18:28 2024 +0800

    Pipe: Introduce counter to reduce historical data buildup in 
PipeRealtimePriorityBlockingQueue (#12881)
---
 .../it/local/IoTDBSubscriptionBasicIT.java         | 18 +++++---
 .../PipeRealtimePriorityBlockingQueue.java         | 54 +++++++++++++++++-----
 .../apache/iotdb/commons/conf/CommonConfig.java    | 10 ++++
 .../iotdb/commons/conf/CommonDescriptor.java       |  6 +++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  9 ++++
 5 files changed, 80 insertions(+), 17 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index b89bd776caf..8f5e8683746 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -168,9 +168,12 @@ public class IoTDBSubscriptionBasicIT extends 
AbstractSubscriptionLocalIT {
     // Check row count
     try {
       // Keep retrying if there are execution failures
-      AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
-      Assert.assertTrue(commitSuccessCount.get() > 
lastCommitSuccessCount.get());
-      Assert.assertEquals(0, commitFailureCount.get());
+      AWAIT.untilAsserted(
+          () -> {
+            Assert.assertEquals(100, rowCount.get());
+            Assert.assertTrue(commitSuccessCount.get() > 
lastCommitSuccessCount.get());
+            Assert.assertEquals(0, commitFailureCount.get());
+          });
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -194,9 +197,12 @@ public class IoTDBSubscriptionBasicIT extends 
AbstractSubscriptionLocalIT {
     // Check row count
     try {
       // Keep retrying if there are execution failures
-      AWAIT.untilAsserted(() -> Assert.assertEquals(200, rowCount.get()));
-      Assert.assertTrue(commitSuccessCount.get() > 
lastCommitSuccessCount.get());
-      Assert.assertEquals(0, commitFailureCount.get());
+      AWAIT.untilAsserted(
+          () -> {
+            Assert.assertEquals(200, rowCount.get());
+            Assert.assertTrue(commitSuccessCount.get() > 
lastCommitSuccessCount.get());
+            Assert.assertEquals(0, commitFailureCount.get());
+          });
     } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index b862907181d..a169cb6a338 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.task.subtask.connector;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -30,6 +31,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import java.util.Objects;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 
@@ -38,6 +40,11 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   private final BlockingDeque<TsFileInsertionEvent> tsfileInsertEventDeque =
       new LinkedBlockingDeque<>();
 
+  private final AtomicInteger eventCount = new AtomicInteger(0);
+
+  private static final int pollHistoryThreshold =
+      PipeConfig.getInstance().getPipeRealTimeQueuePollHistoryThreshold();
+
   public PipeRealtimePriorityBlockingQueue() {
     super(new PipeDataRegionEventCounter());
   }
@@ -71,30 +78,52 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
 
   @Override
   public Event directPoll() {
-    Event event = super.directPoll();
+    Event event = null;
+    if (eventCount.get() >= pollHistoryThreshold) {
+      event = tsfileInsertEventDeque.pollFirst();
+      eventCount.set(0);
+    }
     if (Objects.isNull(event)) {
-      event = tsfileInsertEventDeque.pollLast();
+      event = super.directPoll();
+      if (Objects.isNull(event)) {
+        event = tsfileInsertEventDeque.pollLast();
+      }
+      if (event != null) {
+        eventCount.incrementAndGet();
+      }
     }
+
     return event;
   }
 
   /**
-   * Try to poll the freshest insertion event from the queue. First, try to 
poll the first offered
-   * non-TsFileInsertionEvent. If no such event is available, poll the last 
offered
-   * TsFileInsertionEvent. If no event is available, block until an event is 
available.
+   * When the number of polls exceeds the pollHistoryThreshold, the {@link 
TsFileInsertionEvent} of
+   * the earliest write to the queue is returned. if the pollHistoryThreshold 
is not reached then an
+   * attempt is made to poll the queue for the latest insertion {@link Event}. 
First, it tries to
+   * poll the first provided If there is no such {@link Event}, poll the last 
supplied {@link
+   * TsFileInsertionEvent}. If no {@link Event} is available, it blocks until 
a {@link Event} is
+   * available.
    *
-   * @return the freshest insertion event. can be null if no event is 
available.
+   * @return the freshest insertion {@link Event}. can be {@code null} if no 
{@link Event} is
+   *     available.
    */
   @Override
   public Event waitedPoll() {
     Event event = null;
-
-    if (!super.isEmpty()) {
+    if (eventCount.get() >= pollHistoryThreshold) {
+      event = tsfileInsertEventDeque.pollFirst();
+      eventCount.set(0);
+    }
+    if (event == null) {
       // Sequentially poll the first offered non-TsFileInsertionEvent
       event = super.directPoll();
-    } else if (!tsfileInsertEventDeque.isEmpty()) {
-      // Always poll the last offered event
-      event = tsfileInsertEventDeque.pollLast();
+      if (event == null && !tsfileInsertEventDeque.isEmpty()) {
+        // Always poll the last offered event
+        event = tsfileInsertEventDeque.pollLast();
+      }
+      if (event != null) {
+        eventCount.incrementAndGet();
+      }
     }
 
     // If no event is available, block until an event is available
@@ -103,6 +132,9 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
       if (Objects.isNull(event)) {
         event = tsfileInsertEventDeque.pollLast();
       }
+      if (event != null) {
+        eventCount.incrementAndGet();
+      }
     }
 
     return event;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index d8cb2133aad..4b426e842b0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -182,6 +182,8 @@ public class CommonConfig {
 
   private boolean pipeHardLinkWALEnabled = false;
 
+  private int pipeRealTimeQueuePollHistoryThreshold = 100;
+
   /** The maximum number of threads that can be used to execute subtasks in 
PipeSubtaskExecutor. */
   private int pipeSubtaskExecutorMaxThreadNum =
       Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
@@ -845,6 +847,14 @@ public class CommonConfig {
         pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds;
   }
 
+  public int getPipeRealTimeQueuePollHistoryThreshold() {
+    return pipeRealTimeQueuePollHistoryThreshold;
+  }
+
+  public void setPipeRealTimeQueuePollHistoryThreshold(int 
pipeRealTimeQueuePollHistoryThreshold) {
+    this.pipeRealTimeQueuePollHistoryThreshold = 
pipeRealTimeQueuePollHistoryThreshold;
+  }
+
   public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
     this.pipeAirGapReceiverEnabled = pipeAirGapReceiverEnabled;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index e73b97989b1..6c264c24e2c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -276,6 +276,12 @@ public class CommonDescriptor {
                 String.valueOf(
                     
config.getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold()))));
 
+    config.setPipeRealTimeQueuePollHistoryThreshold(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_realtime_queue_poll_history_threshold",
+                
Integer.toString(config.getPipeRealTimeQueuePollHistoryThreshold()))));
+
     config.setPipeSubtaskExecutorMaxThreadNum(
         Integer.parseInt(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 270a5d4fb8e..28a973def1f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -58,6 +58,12 @@ public class PipeConfig {
     return 
COMMON_CONFIG.getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold();
   }
 
+  /////////////////////////////// Subtask Connector 
///////////////////////////////
+
+  public int getPipeRealTimeQueuePollHistoryThreshold() {
+    return COMMON_CONFIG.getPipeRealTimeQueuePollHistoryThreshold();
+  }
+
   /////////////////////////////// Subtask Executor 
///////////////////////////////
 
   public int getPipeSubtaskExecutorMaxThreadNum() {
@@ -305,6 +311,9 @@ public class PipeConfig {
         "PipeDataStructureTabletMemoryBlockAllocationRejectThreshold: {}",
         getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold());
 
+    LOGGER.info(
+        "PipeRealTimeQueuePollHistoryThreshold: {}", 
getPipeRealTimeQueuePollHistoryThreshold());
+
     LOGGER.info("PipeSubtaskExecutorMaxThreadNum: {}", 
getPipeSubtaskExecutorMaxThreadNum());
     LOGGER.info(
         "PipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount: {}",

Reply via email to