This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 90cc430f6f1 Pipe: Added parameters to the snapshot transfer threshold
from last snapshot to queue's tail index (#12290)
90cc430f6f1 is described below
commit 90cc430f6f1f83065e4aa460c007274c1c62a563
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 3 18:57:52 2024 +0800
Pipe: Added parameters to the snapshot transfer threshold from last
snapshot to queue's tail index (#12290)
---
.../main/java/org/apache/iotdb/commons/conf/CommonConfig.java | 10 ++++++++++
.../java/org/apache/iotdb/commons/conf/CommonDescriptor.java | 5 +++++
.../java/org/apache/iotdb/commons/pipe/config/PipeConfig.java | 7 +++++++
.../queue/listening/AbstractPipeListeningQueue.java | 6 ++++--
4 files changed, 26 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 0c9e28ae500..22a8616ee95 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -226,6 +226,7 @@ public class CommonConfig {
private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = (long) 2 *
1024 * 1024; // 2MB
private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
+ private long pipeListeningQueueTransferSnapshotThreshold = 1000;
private int subscriptionSubtaskExecutorMaxThreadNum =
Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
@@ -951,6 +952,15 @@ public class CommonConfig {
this.pipeLeaderCacheMemoryUsagePercentage =
pipeLeaderCacheMemoryUsagePercentage;
}
+ public long getPipeListeningQueueTransferSnapshotThreshold() {
+ return pipeListeningQueueTransferSnapshotThreshold;
+ }
+
+ public void setPipeListeningQueueTransferSnapshotThreshold(
+ long pipeListeningQueueTransferSnapshotThreshold) {
+ this.pipeListeningQueueTransferSnapshotThreshold =
pipeListeningQueueTransferSnapshotThreshold;
+ }
+
public int getSubscriptionSubtaskExecutorMaxThreadNum() {
return subscriptionSubtaskExecutorMaxThreadNum;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 3a9e2017902..4e347ccd41a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -511,6 +511,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_leader_cache_memory_usage_percentage",
String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage()))));
+ config.setPipeListeningQueueTransferSnapshotThreshold(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_listening_queue_transfer_snapshot_threshold",
+
String.valueOf(config.getPipeListeningQueueTransferSnapshotThreshold()))));
}
private void loadSubscriptionProps(Properties properties) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 80223a4439c..88405d64c93 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -131,6 +131,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
}
+ public long getPipeListeningQueueTransferSnapshotThreshold() {
+ return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
+ }
+
/////////////////////////////// Meta Consistency
///////////////////////////////
public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -284,6 +288,9 @@ public class PipeConfig {
isPipeConnectorRPCThriftCompressionEnabled());
LOGGER.info(
"PipeLeaderCacheMemoryUsagePercentage: {}",
getPipeLeaderCacheMemoryUsagePercentage());
+ LOGGER.info(
+ "PipeListeningQueueTransferSnapshotThreshold: {}",
+ getPipeListeningQueueTransferSnapshotThreshold());
LOGGER.info("PipeAsyncConnectorSelectorNumber: {}",
getPipeAsyncConnectorSelectorNumber());
LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}",
getPipeAsyncConnectorMaxClientNumber());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
index 8d98f024604..d5d1edb345c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.pipe.datastructure.queue.listening;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.commons.pipe.datastructure.queue.serializer.QueueSerializerType;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
@@ -77,8 +78,9 @@ public abstract class AbstractPipeListeningQueue extends
AbstractSerializableLis
}
public synchronized Pair<Long, List<PipeSnapshotEvent>>
findAvailableSnapshots() {
- // TODO: configure maximum number of events from snapshot to queue tail
- if (queueTailIndex2SnapshotsCache.getLeft() < queue.getTailIndex() - 1000)
{
+ if (queueTailIndex2SnapshotsCache.getLeft()
+ < queue.getTailIndex()
+ -
PipeConfig.getInstance().getPipeListeningQueueTransferSnapshotThreshold()) {
clearSnapshots();
}
return queueTailIndex2SnapshotsCache;