This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ea72dfe7bdf213bb0cd607014d905425b30bb946 Author: Zhenyu Luo <[email protected]> AuthorDate: Mon Jul 29 15:20:05 2024 +0800 Pipe: Limit the frequency of progress report for non-forwarding pipe requests to reduce the overhead when sync data between clusters (#13041) (cherry picked from commit 8af04248057885d88e05348f9b2507ce7e834f48) --- .../realtime/assigner/PipeDataRegionAssigner.java | 17 +++++++++++++++++ .../org/apache/iotdb/commons/conf/CommonConfig.java | 12 ++++++++++++ .../org/apache/iotdb/commons/conf/CommonDescriptor.java | 6 ++++++ .../apache/iotdb/commons/pipe/config/PipeConfig.java | 10 ++++++++++ 4 files changed, 45 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 1042b1924a0..762577d2f84 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; @@ -29,11 +30,15 @@ import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe import org.apache.iotdb.db.pipe.metric.PipeAssignerMetrics; import org.apache.iotdb.db.pipe.pattern.CachedSchemaPatternMatcher; import org.apache.iotdb.db.pipe.pattern.PipeDataRegionMatcher; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import java.io.Closeable; public class PipeDataRegionAssigner implements Closeable { + private static final int nonForwardingEventsProgressReportInterval = + PipeConfig.getInstance().getPipeNonForwardingEventsProgressReportInterval(); + /** * The {@link PipeDataRegionMatcher} is used to match the event with the extractor based on the * pattern. @@ -45,6 +50,8 @@ public class PipeDataRegionAssigner implements Closeable { private final String dataRegionId; + private int counter = 0; + public String getDataRegionId() { return dataRegionId; } @@ -73,6 +80,16 @@ public class PipeDataRegionAssigner implements Closeable { .forEach( extractor -> { if (event.getEvent().isGeneratedByPipe() && !extractor.isForwardingPipeRequests()) { + // The frequency of progress reports is limited by the counter, while progress + // reports to TsFileInsertionEvent are not limited. + if (!(event.getEvent() instanceof TsFileInsertionEvent)) { + if (counter < nonForwardingEventsProgressReportInterval) { + counter++; + return; + } + counter = 0; + } + final ProgressReportEvent reportEvent = new ProgressReportEvent( extractor.getPipeName(), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index abcf0679dca..245614d2c09 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -188,6 +188,8 @@ public class CommonConfig { private int pipeSubtaskExecutorMaxThreadNum = Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2)); + private int pipeNonForwardingEventsProgressReportInterval = 100; + private int pipeDataStructureTabletRowSize = 2048; private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.4; @@ -601,6 +603,16 @@ public class CommonConfig { return timestampPrecisionCheckEnabled; } + public int getPipeNonForwardingEventsProgressReportInterval() { + return pipeNonForwardingEventsProgressReportInterval; + } + + public void setPipeNonForwardingEventsProgressReportInterval( + int pipeNonForwardingEventsProgressReportInterval) { + this.pipeNonForwardingEventsProgressReportInterval = + pipeNonForwardingEventsProgressReportInterval; + } + public String getPipeHardlinkBaseDirName() { return pipeHardlinkBaseDirName; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 864df3d75d7..36003ec30e1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -251,6 +251,12 @@ public class CommonDescriptor { } private void loadPipeProps(Properties properties) { + config.setPipeNonForwardingEventsProgressReportInterval( + Integer.parseInt( + properties.getProperty( + "pipe_non_forwarding_events_progress_report_interval", + Integer.toString(config.getPipeNonForwardingEventsProgressReportInterval())))); + config.setPipeHardlinkBaseDirName( properties.getProperty("pipe_hardlink_base_dir_name", config.getPipeHardlinkBaseDirName())); config.setPipeHardlinkTsFileDirName( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 5c0d272f69b..b215c7a3200 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -30,6 +30,12 @@ public class PipeConfig { private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); + /////////////////////////////// Data Synchronization /////////////////////////////// + + public int getPipeNonForwardingEventsProgressReportInterval() { + return COMMON_CONFIG.getPipeNonForwardingEventsProgressReportInterval(); + } + /////////////////////////////// File /////////////////////////////// public String getPipeHardlinkBaseDirName() { @@ -301,6 +307,10 @@ public class PipeConfig { private static final Logger LOGGER = LoggerFactory.getLogger(PipeConfig.class); public void printAllConfigs() { + LOGGER.info( + "PipeNonForwardingEventsProgressReportInterval: {}", + getPipeNonForwardingEventsProgressReportInterval()); + LOGGER.info("PipeHardlinkBaseDirName: {}", getPipeHardlinkBaseDirName()); LOGGER.info("PipeHardlinkTsFileDirName: {}", getPipeHardlinkTsFileDirName()); LOGGER.info("PipeHardlinkWALDirName: {}", getPipeHardlinkWALDirName());
