This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 90cc430f6f1 Pipe: Added parameters to the snapshot transfer threshold 
from last snapshot to queue's tail index (#12290)
90cc430f6f1 is described below

commit 90cc430f6f1f83065e4aa460c007274c1c62a563
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 3 18:57:52 2024 +0800

    Pipe: Added parameters to the snapshot transfer threshold from last 
snapshot to queue's tail index (#12290)
---
 .../main/java/org/apache/iotdb/commons/conf/CommonConfig.java  | 10 ++++++++++
 .../java/org/apache/iotdb/commons/conf/CommonDescriptor.java   |  5 +++++
 .../java/org/apache/iotdb/commons/pipe/config/PipeConfig.java  |  7 +++++++
 .../queue/listening/AbstractPipeListeningQueue.java            |  6 ++++--
 4 files changed, 26 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 0c9e28ae500..22a8616ee95 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -226,6 +226,7 @@ public class CommonConfig {
   private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = (long) 2 * 
1024 * 1024; // 2MB
   private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
   private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
+  private long pipeListeningQueueTransferSnapshotThreshold = 1000;
 
   private int subscriptionSubtaskExecutorMaxThreadNum =
       Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
@@ -951,6 +952,15 @@ public class CommonConfig {
     this.pipeLeaderCacheMemoryUsagePercentage = 
pipeLeaderCacheMemoryUsagePercentage;
   }
 
+  public long getPipeListeningQueueTransferSnapshotThreshold() {
+    return pipeListeningQueueTransferSnapshotThreshold;
+  }
+
+  public void setPipeListeningQueueTransferSnapshotThreshold(
+      long pipeListeningQueueTransferSnapshotThreshold) {
+    this.pipeListeningQueueTransferSnapshotThreshold = 
pipeListeningQueueTransferSnapshotThreshold;
+  }
+
   public int getSubscriptionSubtaskExecutorMaxThreadNum() {
     return subscriptionSubtaskExecutorMaxThreadNum;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 3a9e2017902..4e347ccd41a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -511,6 +511,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_leader_cache_memory_usage_percentage",
                 
String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage()))));
+    config.setPipeListeningQueueTransferSnapshotThreshold(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_listening_queue_transfer_snapshot_threshold",
+                
String.valueOf(config.getPipeListeningQueueTransferSnapshotThreshold()))));
   }
 
   private void loadSubscriptionProps(Properties properties) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 80223a4439c..88405d64c93 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -131,6 +131,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
   }
 
+  public long getPipeListeningQueueTransferSnapshotThreshold() {
+    return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold();
+  }
+
   /////////////////////////////// Meta Consistency 
///////////////////////////////
 
   public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -284,6 +288,9 @@ public class PipeConfig {
         isPipeConnectorRPCThriftCompressionEnabled());
     LOGGER.info(
         "PipeLeaderCacheMemoryUsagePercentage: {}", 
getPipeLeaderCacheMemoryUsagePercentage());
+    LOGGER.info(
+        "PipeListeningQueueTransferSnapshotThreshold: {}",
+        getPipeListeningQueueTransferSnapshotThreshold());
 
     LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", 
getPipeAsyncConnectorSelectorNumber());
     LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", 
getPipeAsyncConnectorMaxClientNumber());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
index 8d98f024604..d5d1edb345c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.pipe.datastructure.queue.listening;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.datastructure.queue.serializer.QueueSerializerType;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
@@ -77,8 +78,9 @@ public abstract class AbstractPipeListeningQueue extends 
AbstractSerializableLis
   }
 
   public synchronized Pair<Long, List<PipeSnapshotEvent>> 
findAvailableSnapshots() {
-    // TODO: configure maximum number of events from snapshot to queue tail
-    if (queueTailIndex2SnapshotsCache.getLeft() < queue.getTailIndex() - 1000) 
{
+    if (queueTailIndex2SnapshotsCache.getLeft()
+        < queue.getTailIndex()
+            - 
PipeConfig.getInstance().getPipeListeningQueueTransferSnapshotThreshold()) {
       clearSnapshots();
     }
     return queueTailIndex2SnapshotsCache;

Reply via email to