This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch duplicated-events
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c0cd686ef8bc9b6ac8faaea186b9278dac49738e
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Jul 8 17:57:54 2024 +0800

    Pipe/Subscription: prevent duplicate tablet/tsfile transmission
---
 .../dataregion/IoTDBDataRegionExtractor.java       | 26 +++++++++++++++++-----
 .../PipeHistoricalDataRegionExtractor.java         |  2 ++
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 18 +++++++++++++++
 3 files changed, 40 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index d89b223df5b..8d77e4ef8b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -385,12 +385,26 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
   private void startHistoricalExtractorAndRealtimeExtractor(
       final AtomicReference<Exception> exceptionHolder) {
     try {
-      // Start realtimeExtractor first to avoid losing data. This may cause 
some
-      // retransmission, yet it is OK according to the idempotency of IoTDB.
-      // Note: The order of historical collection is flushing data -> adding 
all tsFile events.
-      // There can still be writing when tsFile events are added. If we start
-      // realtimeExtractor after the process, then this part of data will be 
lost.
-      realtimeExtractor.start();
+      // Assign the realtime extractor starter to the historical extractor.
+      // The historical extractor will start the realtime extractor when it 
finishes to start.
+      // The historical extractor will hold locks when starting its own and 
the realtime extractor.
+      // Then no events will be lost or duplicated.
+      historicalExtractor.assignRealtimeExtractorStarter(
+          () -> {
+            try {
+              realtimeExtractor.start();
+            } catch (Exception e) {
+              throw new PipeException(
+                  String.format(
+                      "Pipe %s@%s: Start realtime extractor %s error. Reason: 
%s",
+                      pipeName,
+                      regionId,
+                      realtimeExtractor.getClass().getSimpleName(),
+                      e.getMessage()),
+                  e);
+            }
+          });
+
       historicalExtractor.start();
     } catch (final Exception e) {
       exceptionHolder.set(e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionExtractor.java
index 067c5143c78..a6d1c3fbafb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionExtractor.java
@@ -26,4 +26,6 @@ public interface PipeHistoricalDataRegionExtractor extends 
PipeExtractor {
   boolean hasConsumedAll();
 
   int getPendingQueueSize();
+
+  void assignRealtimeExtractorStarter(final Runnable runnable);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index f89d85f0d97..8cea44af50b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -120,6 +120,8 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   private boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
   private boolean isTerminateSignalSent = false;
 
+  private Runnable realtimeExtractorStarter;
+
   private Queue<TsFileResource> pendingQueue;
 
   @Override
@@ -377,6 +379,11 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
         StorageEngine.getInstance().getDataRegion(new 
DataRegionId(dataRegionId));
     if (Objects.isNull(dataRegion)) {
       pendingQueue = new ArrayDeque<>();
+
+      // Start realtime extractor after historical extractor
+      if (Objects.nonNull(realtimeExtractorStarter)) {
+        realtimeExtractorStarter.run();
+      }
       return;
     }
 
@@ -479,6 +486,11 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             resourceList.size(),
             originalSequenceTsFileCount + originalUnsequenceTsFileCount,
             System.currentTimeMillis() - startHistoricalExtractionTime);
+
+        // Start realtime extractor after historical extractor
+        if (Objects.nonNull(realtimeExtractorStarter)) {
+          realtimeExtractorStarter.run();
+        }
       } finally {
         tsFileManager.readUnlock();
       }
@@ -616,6 +628,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     return event;
   }
 
+  @Override
   public synchronized boolean hasConsumedAll() {
     // If the pendingQueue is null when the function is called, it
     // implies that the extractor only extracts deletion thus the
@@ -630,6 +643,11 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     return Objects.nonNull(pendingQueue) ? pendingQueue.size() : 0;
   }
 
+  @Override
+  public void assignRealtimeExtractorStarter(final Runnable 
realtimeExtractorStarter) {
+    this.realtimeExtractorStarter = realtimeExtractorStarter;
+  }
+
   @Override
   public synchronized void close() {
     if (Objects.nonNull(pendingQueue)) {

Reply via email to