This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8af04248057 Pipe: Limit the frequency of progress report for
non-forwarding pipe requests to reduce the overhead when sync data between
clusters (#13041)
8af04248057 is described below
commit 8af04248057885d88e05348f9b2507ce7e834f48
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jul 29 15:20:05 2024 +0800
Pipe: Limit the frequency of progress report for non-forwarding pipe
requests to reduce the overhead when sync data between clusters (#13041)
---
.../realtime/assigner/PipeDataRegionAssigner.java | 17 +++++++++++++++++
.../org/apache/iotdb/commons/conf/CommonConfig.java | 12 ++++++++++++
.../org/apache/iotdb/commons/conf/CommonDescriptor.java | 6 ++++++
.../apache/iotdb/commons/pipe/config/PipeConfig.java | 10 ++++++++++
4 files changed, 45 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 1042b1924a0..762577d2f84 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -29,11 +30,15 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
import org.apache.iotdb.db.pipe.metric.PipeAssignerMetrics;
import org.apache.iotdb.db.pipe.pattern.CachedSchemaPatternMatcher;
import org.apache.iotdb.db.pipe.pattern.PipeDataRegionMatcher;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import java.io.Closeable;
public class PipeDataRegionAssigner implements Closeable {
+ private static final int nonForwardingEventsProgressReportInterval =
+
PipeConfig.getInstance().getPipeNonForwardingEventsProgressReportInterval();
+
/**
* The {@link PipeDataRegionMatcher} is used to match the event with the
extractor based on the
* pattern.
@@ -45,6 +50,8 @@ public class PipeDataRegionAssigner implements Closeable {
private final String dataRegionId;
+ private int counter = 0;
+
public String getDataRegionId() {
return dataRegionId;
}
@@ -73,6 +80,16 @@ public class PipeDataRegionAssigner implements Closeable {
.forEach(
extractor -> {
if (event.getEvent().isGeneratedByPipe() &&
!extractor.isForwardingPipeRequests()) {
+ // The frequency of progress reports is limited by the
counter, while progress
+ // reports to TsFileInsertionEvent are not limited.
+ if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
+ if (counter < nonForwardingEventsProgressReportInterval) {
+ counter++;
+ return;
+ }
+ counter = 0;
+ }
+
final ProgressReportEvent reportEvent =
new ProgressReportEvent(
extractor.getPipeName(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index abcf0679dca..245614d2c09 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -188,6 +188,8 @@ public class CommonConfig {
private int pipeSubtaskExecutorMaxThreadNum =
Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
+ private int pipeNonForwardingEventsProgressReportInterval = 100;
+
private int pipeDataStructureTabletRowSize = 2048;
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold =
0.4;
@@ -601,6 +603,16 @@ public class CommonConfig {
return timestampPrecisionCheckEnabled;
}
+ public int getPipeNonForwardingEventsProgressReportInterval() {
+ return pipeNonForwardingEventsProgressReportInterval;
+ }
+
+ public void setPipeNonForwardingEventsProgressReportInterval(
+ int pipeNonForwardingEventsProgressReportInterval) {
+ this.pipeNonForwardingEventsProgressReportInterval =
+ pipeNonForwardingEventsProgressReportInterval;
+ }
+
public String getPipeHardlinkBaseDirName() {
return pipeHardlinkBaseDirName;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 864df3d75d7..36003ec30e1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -251,6 +251,12 @@ public class CommonDescriptor {
}
private void loadPipeProps(Properties properties) {
+ config.setPipeNonForwardingEventsProgressReportInterval(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_non_forwarding_events_progress_report_interval",
+
Integer.toString(config.getPipeNonForwardingEventsProgressReportInterval()))));
+
config.setPipeHardlinkBaseDirName(
properties.getProperty("pipe_hardlink_base_dir_name",
config.getPipeHardlinkBaseDirName()));
config.setPipeHardlinkTsFileDirName(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 5c0d272f69b..b215c7a3200 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -30,6 +30,12 @@ public class PipeConfig {
private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
+ /////////////////////////////// Data Synchronization
///////////////////////////////
+
+ public int getPipeNonForwardingEventsProgressReportInterval() {
+ return COMMON_CONFIG.getPipeNonForwardingEventsProgressReportInterval();
+ }
+
/////////////////////////////// File ///////////////////////////////
public String getPipeHardlinkBaseDirName() {
@@ -301,6 +307,10 @@ public class PipeConfig {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConfig.class);
public void printAllConfigs() {
+ LOGGER.info(
+ "PipeNonForwardingEventsProgressReportInterval: {}",
+ getPipeNonForwardingEventsProgressReportInterval());
+
LOGGER.info("PipeHardlinkBaseDirName: {}", getPipeHardlinkBaseDirName());
LOGGER.info("PipeHardlinkTsFileDirName: {}",
getPipeHardlinkTsFileDirName());
LOGGER.info("PipeHardlinkWALDirName: {}", getPipeHardlinkWALDirName());