This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b3a0a4ab949 Pipe: Introduce counter to reduce historical data buildup
in PipeRealtimePriorityBlockingQueue (#12881)
b3a0a4ab949 is described below
commit b3a0a4ab949ba1c7c86de3f1d2f04f9c3661b61f
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Jul 12 19:18:28 2024 +0800
Pipe: Introduce counter to reduce historical data buildup in
PipeRealtimePriorityBlockingQueue (#12881)
---
.../it/local/IoTDBSubscriptionBasicIT.java | 18 +++++---
.../PipeRealtimePriorityBlockingQueue.java | 54 +++++++++++++++++-----
.../apache/iotdb/commons/conf/CommonConfig.java | 10 ++++
.../iotdb/commons/conf/CommonDescriptor.java | 6 +++
.../iotdb/commons/pipe/config/PipeConfig.java | 9 ++++
5 files changed, 80 insertions(+), 17 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index b89bd776caf..8f5e8683746 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -168,9 +168,12 @@ public class IoTDBSubscriptionBasicIT extends
AbstractSubscriptionLocalIT {
// Check row count
try {
// Keep retrying if there are execution failures
- AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
- Assert.assertTrue(commitSuccessCount.get() >
lastCommitSuccessCount.get());
- Assert.assertEquals(0, commitFailureCount.get());
+ AWAIT.untilAsserted(
+ () -> {
+ Assert.assertEquals(100, rowCount.get());
+ Assert.assertTrue(commitSuccessCount.get() >
lastCommitSuccessCount.get());
+ Assert.assertEquals(0, commitFailureCount.get());
+ });
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -194,9 +197,12 @@ public class IoTDBSubscriptionBasicIT extends
AbstractSubscriptionLocalIT {
// Check row count
try {
// Keep retrying if there are execution failures
- AWAIT.untilAsserted(() -> Assert.assertEquals(200, rowCount.get()));
- Assert.assertTrue(commitSuccessCount.get() >
lastCommitSuccessCount.get());
- Assert.assertEquals(0, commitFailureCount.get());
+ AWAIT.untilAsserted(
+ () -> {
+ Assert.assertEquals(200, rowCount.get());
+ Assert.assertTrue(commitSuccessCount.get() >
lastCommitSuccessCount.get());
+ Assert.assertEquals(0, commitFailureCount.get());
+ });
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index b862907181d..a169cb6a338 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.task.subtask.connector;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -30,6 +31,7 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import java.util.Objects;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -38,6 +40,11 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
private final BlockingDeque<TsFileInsertionEvent> tsfileInsertEventDeque =
new LinkedBlockingDeque<>();
+ private final AtomicInteger eventCount = new AtomicInteger(0);
+
+ private static final int pollHistoryThreshold =
+ PipeConfig.getInstance().getPipeRealTimeQueuePollHistoryThreshold();
+
public PipeRealtimePriorityBlockingQueue() {
super(new PipeDataRegionEventCounter());
}
@@ -71,30 +78,52 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
@Override
public Event directPoll() {
- Event event = super.directPoll();
+ Event event = null;
+ if (eventCount.get() >= pollHistoryThreshold) {
+ event = tsfileInsertEventDeque.pollFirst();
+ eventCount.set(0);
+ }
if (Objects.isNull(event)) {
- event = tsfileInsertEventDeque.pollLast();
+ event = super.directPoll();
+ if (Objects.isNull(event)) {
+ event = tsfileInsertEventDeque.pollLast();
+ }
+ if (event != null) {
+ eventCount.incrementAndGet();
+ }
}
+
return event;
}
/**
- * Try to poll the freshest insertion event from the queue. First, try to
poll the first offered
- * non-TsFileInsertionEvent. If no such event is available, poll the last
offered
- * TsFileInsertionEvent. If no event is available, block until an event is
available.
+ * When the number of polls exceeds the pollHistoryThreshold, the {@link
TsFileInsertionEvent} of
+ * the earliest write to the queue is returned. if the pollHistoryThreshold
is not reached then an
+ * attempt is made to poll the queue for the latest insertion {@link Event}.
First, it tries to
+ * poll the first provided If there is no such {@link Event}, poll the last
supplied {@link
+ * TsFileInsertionEvent}. If no {@link Event} is available, it blocks until
a {@link Event} is
+ * available.
*
- * @return the freshest insertion event. can be null if no event is
available.
+ * @return the freshest insertion {@link Event}. can be {@code null} if no
{@link Event} is
+ * available.
*/
@Override
public Event waitedPoll() {
Event event = null;
-
- if (!super.isEmpty()) {
+ if (eventCount.get() >= pollHistoryThreshold) {
+ event = tsfileInsertEventDeque.pollFirst();
+ eventCount.set(0);
+ }
+ if (event == null) {
// Sequentially poll the first offered non-TsFileInsertionEvent
event = super.directPoll();
- } else if (!tsfileInsertEventDeque.isEmpty()) {
- // Always poll the last offered event
- event = tsfileInsertEventDeque.pollLast();
+ if (event == null && !tsfileInsertEventDeque.isEmpty()) {
+ // Always poll the last offered event
+ event = tsfileInsertEventDeque.pollLast();
+ }
+ if (event != null) {
+ eventCount.incrementAndGet();
+ }
}
// If no event is available, block until an event is available
@@ -103,6 +132,9 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
if (Objects.isNull(event)) {
event = tsfileInsertEventDeque.pollLast();
}
+ if (event != null) {
+ eventCount.incrementAndGet();
+ }
}
return event;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index d8cb2133aad..4b426e842b0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -182,6 +182,8 @@ public class CommonConfig {
private boolean pipeHardLinkWALEnabled = false;
+ private int pipeRealTimeQueuePollHistoryThreshold = 100;
+
/** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor. */
private int pipeSubtaskExecutorMaxThreadNum =
Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
@@ -845,6 +847,14 @@ public class CommonConfig {
pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds;
}
+ public int getPipeRealTimeQueuePollHistoryThreshold() {
+ return pipeRealTimeQueuePollHistoryThreshold;
+ }
+
+ public void setPipeRealTimeQueuePollHistoryThreshold(int
pipeRealTimeQueuePollHistoryThreshold) {
+ this.pipeRealTimeQueuePollHistoryThreshold =
pipeRealTimeQueuePollHistoryThreshold;
+ }
+
public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
this.pipeAirGapReceiverEnabled = pipeAirGapReceiverEnabled;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index e73b97989b1..6c264c24e2c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -276,6 +276,12 @@ public class CommonDescriptor {
String.valueOf(
config.getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold()))));
+ config.setPipeRealTimeQueuePollHistoryThreshold(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_realtime_queue_poll_history_threshold",
+
Integer.toString(config.getPipeRealTimeQueuePollHistoryThreshold()))));
+
config.setPipeSubtaskExecutorMaxThreadNum(
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 270a5d4fb8e..28a973def1f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -58,6 +58,12 @@ public class PipeConfig {
return
COMMON_CONFIG.getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold();
}
+ /////////////////////////////// Subtask Connector
///////////////////////////////
+
+ public int getPipeRealTimeQueuePollHistoryThreshold() {
+ return COMMON_CONFIG.getPipeRealTimeQueuePollHistoryThreshold();
+ }
+
/////////////////////////////// Subtask Executor
///////////////////////////////
public int getPipeSubtaskExecutorMaxThreadNum() {
@@ -305,6 +311,9 @@ public class PipeConfig {
"PipeDataStructureTabletMemoryBlockAllocationRejectThreshold: {}",
getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold());
+ LOGGER.info(
+ "PipeRealTimeQueuePollHistoryThreshold: {}",
getPipeRealTimeQueuePollHistoryThreshold());
+
LOGGER.info("PipeSubtaskExecutorMaxThreadNum: {}",
getPipeSubtaskExecutorMaxThreadNum());
LOGGER.info(
"PipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount: {}",