This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mergemaster0808
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ea72dfe7bdf213bb0cd607014d905425b30bb946
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jul 29 15:20:05 2024 +0800

    Pipe: Limit the frequency of progress report for non-forwarding pipe 
requests to reduce the overhead when sync data between clusters (#13041)
    
    (cherry picked from commit 8af04248057885d88e05348f9b2507ce7e834f48)
---
 .../realtime/assigner/PipeDataRegionAssigner.java       | 17 +++++++++++++++++
 .../org/apache/iotdb/commons/conf/CommonConfig.java     | 12 ++++++++++++
 .../org/apache/iotdb/commons/conf/CommonDescriptor.java |  6 ++++++
 .../apache/iotdb/commons/pipe/config/PipeConfig.java    | 10 ++++++++++
 4 files changed, 45 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 1042b1924a0..762577d2f84 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -29,11 +30,15 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
 import org.apache.iotdb.db.pipe.metric.PipeAssignerMetrics;
 import org.apache.iotdb.db.pipe.pattern.CachedSchemaPatternMatcher;
 import org.apache.iotdb.db.pipe.pattern.PipeDataRegionMatcher;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import java.io.Closeable;
 
 public class PipeDataRegionAssigner implements Closeable {
 
+  private static final int nonForwardingEventsProgressReportInterval =
+      
PipeConfig.getInstance().getPipeNonForwardingEventsProgressReportInterval();
+
   /**
    * The {@link PipeDataRegionMatcher} is used to match the event with the 
extractor based on the
    * pattern.
@@ -45,6 +50,8 @@ public class PipeDataRegionAssigner implements Closeable {
 
   private final String dataRegionId;
 
+  private int counter = 0;
+
   public String getDataRegionId() {
     return dataRegionId;
   }
@@ -73,6 +80,16 @@ public class PipeDataRegionAssigner implements Closeable {
         .forEach(
             extractor -> {
               if (event.getEvent().isGeneratedByPipe() && 
!extractor.isForwardingPipeRequests()) {
+                // The frequency of progress reports is limited by the 
counter, while progress
+                // reports to TsFileInsertionEvent are not limited.
+                if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
+                  if (counter < nonForwardingEventsProgressReportInterval) {
+                    counter++;
+                    return;
+                  }
+                  counter = 0;
+                }
+
                 final ProgressReportEvent reportEvent =
                     new ProgressReportEvent(
                         extractor.getPipeName(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index abcf0679dca..245614d2c09 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -188,6 +188,8 @@ public class CommonConfig {
   private int pipeSubtaskExecutorMaxThreadNum =
       Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
 
+  private int pipeNonForwardingEventsProgressReportInterval = 100;
+
   private int pipeDataStructureTabletRowSize = 2048;
   private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 
0.4;
 
@@ -601,6 +603,16 @@ public class CommonConfig {
     return timestampPrecisionCheckEnabled;
   }
 
+  public int getPipeNonForwardingEventsProgressReportInterval() {
+    return pipeNonForwardingEventsProgressReportInterval;
+  }
+
+  public void setPipeNonForwardingEventsProgressReportInterval(
+      int pipeNonForwardingEventsProgressReportInterval) {
+    this.pipeNonForwardingEventsProgressReportInterval =
+        pipeNonForwardingEventsProgressReportInterval;
+  }
+
   public String getPipeHardlinkBaseDirName() {
     return pipeHardlinkBaseDirName;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 864df3d75d7..36003ec30e1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -251,6 +251,12 @@ public class CommonDescriptor {
   }
 
   private void loadPipeProps(Properties properties) {
+    config.setPipeNonForwardingEventsProgressReportInterval(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_non_forwarding_events_progress_report_interval",
+                
Integer.toString(config.getPipeNonForwardingEventsProgressReportInterval()))));
+
     config.setPipeHardlinkBaseDirName(
         properties.getProperty("pipe_hardlink_base_dir_name", 
config.getPipeHardlinkBaseDirName()));
     config.setPipeHardlinkTsFileDirName(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 5c0d272f69b..b215c7a3200 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -30,6 +30,12 @@ public class PipeConfig {
 
   private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
 
+  /////////////////////////////// Data Synchronization 
///////////////////////////////
+
+  public int getPipeNonForwardingEventsProgressReportInterval() {
+    return COMMON_CONFIG.getPipeNonForwardingEventsProgressReportInterval();
+  }
+
   /////////////////////////////// File ///////////////////////////////
 
   public String getPipeHardlinkBaseDirName() {
@@ -301,6 +307,10 @@ public class PipeConfig {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConfig.class);
 
   public void printAllConfigs() {
+    LOGGER.info(
+        "PipeNonForwardingEventsProgressReportInterval: {}",
+        getPipeNonForwardingEventsProgressReportInterval());
+
     LOGGER.info("PipeHardlinkBaseDirName: {}", getPipeHardlinkBaseDirName());
     LOGGER.info("PipeHardlinkTsFileDirName: {}", 
getPipeHardlinkTsFileDirName());
     LOGGER.info("PipeHardlinkWALDirName: {}", getPipeHardlinkWALDirName());

Reply via email to