This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 73f4e7b2494 PipeConsensus: always execute flush for historical data 
extraction of consensus pipe to reduce data sync delay (#14132) (#14154)
73f4e7b2494 is described below

commit 73f4e7b249422fa7fcace29c2b698beb1c559451
Author: V_Galaxy <[email protected]>
AuthorDate: Thu Nov 21 16:50:50 2024 +0800

    PipeConsensus: always execute flush for historical data extraction of 
consensus pipe to reduce data sync delay (#14132) (#14154)
---
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 51 ++++++++++++++--------
 1 file changed, 34 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 71f62e98871..80d43a320b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
 import 
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
@@ -395,23 +396,39 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     final long startHistoricalExtractionTime = System.currentTimeMillis();
     try {
       LOGGER.info("Pipe {}@{}: start to flush data region", pipeName, 
dataRegionId);
-      synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
-        final long lastFlushedByPipeTime =
-            DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
-        if (System.currentTimeMillis() - lastFlushedByPipeTime >= 
PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
-          dataRegion.syncCloseAllWorkingTsFileProcessors();
-          DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, 
System.currentTimeMillis());
-          LOGGER.info(
-              "Pipe {}@{}: finish to flush data region, took {} ms",
-              pipeName,
-              dataRegionId,
-              System.currentTimeMillis() - startHistoricalExtractionTime);
-        } else {
-          LOGGER.info(
-              "Pipe {}@{}: skip to flush data region, last flushed time {} ms 
ago",
-              pipeName,
-              dataRegionId,
-              System.currentTimeMillis() - lastFlushedByPipeTime);
+
+      // Consider the scenario: a consensus pipe comes to the same region, 
followed by another pipe
+      // **immediately**, the latter pipe will skip the flush operation.
+      // Since a large number of consensus pipes are not created at the same 
time, resulting in no
+      // serious waiting for locks. Therefore, the flush operation is always 
performed for the
+      // consensus pipe, and the lastFlushed timestamp is not updated here.
+      if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
+        dataRegion.syncCloseAllWorkingTsFileProcessors();
+        LOGGER.info(
+            "Pipe {}@{}: finish to flush data region, took {} ms",
+            pipeName,
+            dataRegionId,
+            System.currentTimeMillis() - startHistoricalExtractionTime);
+      } else {
+        synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
+          final long lastFlushedByPipeTime =
+              DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
+          if (System.currentTimeMillis() - lastFlushedByPipeTime >= 
PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
+            dataRegion.syncCloseAllWorkingTsFileProcessors();
+            DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(
+                dataRegionId, System.currentTimeMillis());
+            LOGGER.info(
+                "Pipe {}@{}: finish to flush data region, took {} ms",
+                pipeName,
+                dataRegionId,
+                System.currentTimeMillis() - startHistoricalExtractionTime);
+          } else {
+            LOGGER.info(
+                "Pipe {}@{}: skip to flush data region, last flushed time {} 
ms ago",
+                pipeName,
+                dataRegionId,
+                System.currentTimeMillis() - lastFlushedByPipeTime);
+          }
         }
       }
 

Reply via email to