This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5debd8a757a [IOTDB-6202] Pipe: listen to tsfile load when realtime 
mode = forced-log (#11369)
5debd8a757a is described below

commit 5debd8a757a2244ea5632079c24370e0de197b73
Author: Caideyipi <[email protected]>
AuthorDate: Tue Oct 24 13:14:01 2023 +0800

    [IOTDB-6202] Pipe: listen to tsfile load when realtime mode = forced-log 
(#11369)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../common/tsfile/PipeTsFileInsertionEvent.java    | 22 +++++++++--
 .../event/realtime/PipeRealtimeEventFactory.java   |  4 +-
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  1 +
 .../PipeRealtimeDataRegionLogExtractor.java        | 45 +++++++++++++++++++---
 .../listener/PipeInsertionDataNodeListener.java    |  7 +++-
 .../db/storageengine/dataregion/DataRegion.java    |  2 +-
 .../dataregion/memtable/TsFileProcessor.java       |  2 +-
 .../db/pipe/extractor/PipeRealtimeExtractTest.java |  2 +-
 8 files changed, 69 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index e8896746ba3..8914e56cbfa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -49,18 +49,20 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   private final TsFileResource resource;
   private File tsFile;
 
+  private final boolean isLoaded;
   private final boolean isGeneratedByPipe;
 
   private final AtomicBoolean isClosed;
-
   private TsFileInsertionDataContainer dataContainer;
 
-  public PipeTsFileInsertionEvent(TsFileResource resource, boolean 
isGeneratedByPipe) {
-    this(resource, isGeneratedByPipe, null, null, Long.MIN_VALUE, 
Long.MAX_VALUE, false);
+  public PipeTsFileInsertionEvent(
+      TsFileResource resource, boolean isLoaded, boolean isGeneratedByPipe) {
+    this(resource, isLoaded, isGeneratedByPipe, null, null, Long.MIN_VALUE, 
Long.MAX_VALUE, false);
   }
 
   public PipeTsFileInsertionEvent(
       TsFileResource resource,
+      boolean isLoaded,
       boolean isGeneratedByPipe,
       PipeTaskMeta pipeTaskMeta,
       String pattern,
@@ -80,6 +82,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
     this.resource = resource;
     tsFile = resource.getTsFile();
 
+    this.isLoaded = isLoaded;
     this.isGeneratedByPipe = isGeneratedByPipe;
 
     isClosed = new AtomicBoolean(resource.isClosed());
@@ -114,6 +117,10 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
     return tsFile;
   }
 
+  public boolean getIsLoaded() {
+    return isLoaded;
+  }
+
   /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
@@ -164,7 +171,14 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
   public PipeTsFileInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       PipeTaskMeta pipeTaskMeta, String pattern) {
     return new PipeTsFileInsertionEvent(
-        resource, isGeneratedByPipe, pipeTaskMeta, pattern, startTime, 
endTime, needParseTime);
+        resource,
+        isLoaded,
+        isGeneratedByPipe,
+        pipeTaskMeta,
+        pattern,
+        startTime,
+        endTime,
+        needParseTime);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 3a95f9ccf26..97c4bb8eade 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -32,9 +32,9 @@ public class PipeRealtimeEventFactory {
   private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new 
TsFileEpochManager();
 
   public static PipeRealtimeEvent createRealtimeEvent(
-      TsFileResource resource, boolean isGeneratedByPipe) {
+      TsFileResource resource, boolean isLoaded, boolean isGeneratedByPipe) {
     return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
-        new PipeTsFileInsertionEvent(resource, isGeneratedByPipe), resource);
+        new PipeTsFileInsertionEvent(resource, isLoaded, isGeneratedByPipe), 
resource);
   }
 
   public static PipeRealtimeEvent createRealtimeEvent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 80e22d8c220..1bd362166d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -284,6 +284,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
         new PipeTsFileInsertionEvent(
             resource,
             false,
+            false,
             pipeTaskMeta,
             pattern,
             historicalDataExtractionStartTime,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index bb796ec380a..c309bd2c620 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -22,10 +22,12 @@ package org.apache.iotdb.db.pipe.extractor.realtime;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,25 +39,57 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
 
   @Override
   protected void doExtract(PipeRealtimeEvent event) {
-    if (event.getEvent() instanceof PipeHeartbeatEvent) {
+    final Event eventToExtract = event.getEvent();
+
+    if (eventToExtract instanceof TabletInsertionEvent) {
+      extractTabletInsertion(event);
+    } else if (eventToExtract instanceof TsFileInsertionEvent) {
+      extractTsFileInsertion(event);
+    } else if (eventToExtract instanceof PipeHeartbeatEvent) {
       extractHeartbeat(event);
-      return;
+    } else {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Unsupported event type %s for hybrid realtime extractor %s",
+              eventToExtract.getClass(), this));
     }
+  }
 
+  private void extractTabletInsertion(PipeRealtimeEvent event) {
     event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TABLET);
 
-    if (!(event.getEvent() instanceof TabletInsertionEvent)) {
+    if (!pendingQueue.waitedOffer(event)) {
+      // this would not happen, but just in case.
+      // pendingQueue is unbounded, so it should never reach capacity.
+      final String errorMessage =
+          String.format(
+              "extract: pending queue of PipeRealtimeDataRegionLogExtractor %s 
"
+                  + "has reached capacity, discard tablet event %s, current 
state %s",
+              this, event, event.getTsFileEpoch().getState(this));
+      LOGGER.error(errorMessage);
+      PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
+
+      // ignore this event.
+      
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(),
 false);
+    }
+  }
+
+  private void extractTsFileInsertion(PipeRealtimeEvent event) {
+    if (!((PipeTsFileInsertionEvent) event.getEvent()).getIsLoaded()) {
+      // only loaded tsfile can be extracted by this extractor. Ignore this 
event.
       
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(),
 false);
       return;
     }
 
+    event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
+
     if (!pendingQueue.waitedOffer(event)) {
       // this would not happen, but just in case.
       // pendingQueue is unbounded, so it should never reach capacity.
       final String errorMessage =
           String.format(
               "extract: pending queue of PipeRealtimeDataRegionLogExtractor %s 
"
-                  + "has reached capacity, discard tablet event %s, current 
state %s",
+                  + "has reached capacity, discard loaded tsFile event %s, 
current state %s",
               this, event, event.getTsFileEpoch().getState(this));
       LOGGER.error(errorMessage);
       PipeAgent.runtime().report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
@@ -100,7 +134,8 @@ public class PipeRealtimeDataRegionLogExtractor extends 
PipeRealtimeDataRegionEx
 
   @Override
   public boolean isNeedListenToTsFile() {
-    return false;
+    // Only listen to loaded tsFiles
+    return true;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index 5bee5216451..f65cd12a6ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -92,7 +92,10 @@ public class PipeInsertionDataNodeListener {
   //////////////////////////// listen to events ////////////////////////////
 
   public void listenToTsFile(
-      String dataRegionId, TsFileResource tsFileResource, boolean 
isGeneratedByPipe) {
+      String dataRegionId,
+      TsFileResource tsFileResource,
+      boolean isLoaded,
+      boolean isGeneratedByPipe) {
     // We don't judge whether listenToTsFileExtractorCount.get() == 0 here on 
purpose
     // because extractors may use tsfile events when some exceptions occur in 
the
     // insert nodes listening process.
@@ -105,7 +108,7 @@ public class PipeInsertionDataNodeListener {
     }
 
     assigner.publishToAssign(
-        PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource, 
isGeneratedByPipe));
+        PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource, isLoaded, 
isGeneratedByPipe));
   }
 
   public void listenToInsertNode(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 5723fd62cc5..8bce9f4f6ee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2205,7 +2205,7 @@ public class DataRegion implements IDataRegionForQuery {
           tsfileToBeInserted, newTsFileResource, newFilePartitionId, 
deleteOriginFile);
 
       PipeInsertionDataNodeListener.getInstance()
-          .listenToTsFile(dataRegionId, newTsFileResource, isGeneratedByPipe);
+          .listenToTsFile(dataRegionId, newTsFileResource, true, 
isGeneratedByPipe);
 
       FileMetrics.getInstance()
           .addTsFile(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 6f0e6966b06..1363128e181 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -900,7 +900,7 @@ public class TsFileProcessor {
       try {
         PipeInsertionDataNodeListener.getInstance()
             .listenToTsFile(
-                dataRegionInfo.getDataRegion().getDataRegionId(), 
tsFileResource, false);
+                dataRegionInfo.getDataRegion().getDataRegionId(), 
tsFileResource, false, false);
 
         // When invoke closing TsFile after insert data to memTable, we 
shouldn't flush until invoke
         // flushing memTable in System module.
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
index 1900ff539bb..50da3cfc54a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
@@ -287,7 +287,7 @@ public class PipeRealtimeExtractTest {
                         false),
                     resource);
             PipeInsertionDataNodeListener.getInstance()
-                .listenToTsFile(dataRegionId, resource, false);
+                .listenToTsFile(dataRegionId, resource, false, false);
           }
         });
   }

Reply via email to