This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new bba1af2efb2 Pipe: Fix startup failure of 
PipeHistoricalDataRegionTsFileExtractor due to unprepared StorageEngine 
(#13526) (#13540)
bba1af2efb2 is described below

commit bba1af2efb270750c2f8f8bdec4613abe0cfe666
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Sep 18 20:06:52 2024 +0800

    Pipe: Fix startup failure of PipeHistoricalDataRegionTsFileExtractor due to 
unprepared StorageEngine (#13526) (#13540)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
    (cherry picked from commit 07f14753418def64d38228f0a16499a7d7408c3b)
    
    Co-authored-by: Zhenyu Luo <[email protected]>
---
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 23 +++++++++++++++++-----
 1 file changed, 18 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 310ed2640d3..98280f8b9ec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -120,6 +120,8 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   private boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
   private boolean isTerminateSignalSent = false;
 
+  private volatile boolean hasBeenStarted = false;
+
   private Queue<TsFileResource> pendingQueue;
 
   @Override
@@ -369,12 +371,18 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
   @Override
   public synchronized void start() {
-    if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
+    if (!shouldExtractInsertion) {
+      hasBeenStarted = true;
       return;
     }
-    if (!shouldExtractInsertion) {
+    if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
+      LOGGER.info(
+          "Pipe {}@{}: failed to start to extract historical TsFile, storage 
engine is not ready. Will retry later.",
+          pipeName,
+          dataRegionId);
       return;
     }
+    hasBeenStarted = true;
 
     final DataRegion dataRegion =
         StorageEngine.getInstance().getDataRegion(new 
DataRegionId(dataRegionId));
@@ -570,6 +578,10 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
   @Override
   public synchronized Event supply() {
+    if (!hasBeenStarted && 
StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
+      start();
+    }
+
     if (Objects.isNull(pendingQueue)) {
       return null;
     }
@@ -639,9 +651,10 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   public synchronized boolean hasConsumedAll() {
     // If the pendingQueue is null when the function is called, it implies 
that the extractor only
     // extracts deletion thus the historical event has nothing to consume.
-    return Objects.isNull(pendingQueue)
-        || pendingQueue.isEmpty()
-            && (!shouldTerminatePipeOnAllHistoricalEventsConsumed || 
isTerminateSignalSent);
+    return hasBeenStarted
+        && (Objects.isNull(pendingQueue)
+            || pendingQueue.isEmpty()
+                && (!shouldTerminatePipeOnAllHistoricalEventsConsumed || 
isTerminateSignalSent));
   }
 
   @Override

Reply via email to