This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 43ed86500d6 PipeConsensus: always execute flush for historical data 
extraction of consensus pipe to reduce data sync delay (#14132)
43ed86500d6 is described below

commit 43ed86500d609e6664740d36580e2a39a29ceb80
Author: V_Galaxy <[email protected]>
AuthorDate: Thu Nov 21 11:08:09 2024 +0800

    PipeConsensus: always execute flush for historical data extraction of 
consensus pipe to reduce data sync delay (#14132)
---
 ...eHistoricalDataRegionTsFileAndDeletionExtractor.java | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
index ee44ac465be..61b721f8877 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
 import 
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
@@ -502,6 +503,22 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
   private void flushTsFilesForExtraction(
       DataRegion dataRegion, final long startHistoricalExtractionTime) {
     LOGGER.info("Pipe {}@{}: start to flush data region", pipeName, 
dataRegionId);
+
+    // Consider the scenario: a consensus pipe comes to the same region, 
followed by another pipe
+    // **immediately**, the latter pipe will skip the flush operation.
+    // Since a large number of consensus pipes are not created at the same 
time, resulting in no
+    // serious waiting for locks. Therefore, the flush operation is always 
performed for the
+    // consensus pipe, and the lastFlushed timestamp is not updated here.
+    if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
+      dataRegion.syncCloseAllWorkingTsFileProcessors();
+      LOGGER.info(
+          "Pipe {}@{}: finish to flush data region, took {} ms",
+          pipeName,
+          dataRegionId,
+          System.currentTimeMillis() - startHistoricalExtractionTime);
+      return;
+    }
+
     synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
       final long lastFlushedByPipeTime = 
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
       if (System.currentTimeMillis() - lastFlushedByPipeTime >= 
PIPE_MIN_FLUSH_INTERVAL_IN_MS) {

Reply via email to