This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch duplicated-events in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c0cd686ef8bc9b6ac8faaea186b9278dac49738e Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Jul 8 17:57:54 2024 +0800 Pipe/Subscription: prevent duplicate tablet/tsfile transmission --- .../dataregion/IoTDBDataRegionExtractor.java | 26 +++++++++++++++++----- .../PipeHistoricalDataRegionExtractor.java | 2 ++ .../PipeHistoricalDataRegionTsFileExtractor.java | 18 +++++++++++++++ 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index d89b223df5b..8d77e4ef8b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -385,12 +385,26 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { private void startHistoricalExtractorAndRealtimeExtractor( final AtomicReference<Exception> exceptionHolder) { try { - // Start realtimeExtractor first to avoid losing data. This may cause some - // retransmission, yet it is OK according to the idempotency of IoTDB. - // Note: The order of historical collection is flushing data -> adding all tsFile events. - // There can still be writing when tsFile events are added. If we start - // realtimeExtractor after the process, then this part of data will be lost. - realtimeExtractor.start(); + // Assign the realtime extractor starter to the historical extractor. + // The historical extractor will start the realtime extractor when it finishes to start. + // The historical extractor will hold locks when starting its own and the realtime extractor. + // Then no events will be lost or duplicated. + historicalExtractor.assignRealtimeExtractorStarter( + () -> { + try { + realtimeExtractor.start(); + } catch (Exception e) { + throw new PipeException( + String.format( + "Pipe %s@%s: Start realtime extractor %s error. Reason: %s", + pipeName, + regionId, + realtimeExtractor.getClass().getSimpleName(), + e.getMessage()), + e); + } + }); + historicalExtractor.start(); } catch (final Exception e) { exceptionHolder.set(e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionExtractor.java index 067c5143c78..a6d1c3fbafb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionExtractor.java @@ -26,4 +26,6 @@ public interface PipeHistoricalDataRegionExtractor extends PipeExtractor { boolean hasConsumedAll(); int getPendingQueueSize(); + + void assignRealtimeExtractorStarter(final Runnable runnable); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index f89d85f0d97..8cea44af50b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -120,6 +120,8 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa private boolean shouldTerminatePipeOnAllHistoricalEventsConsumed; private boolean isTerminateSignalSent = false; + private Runnable realtimeExtractorStarter; + private Queue<TsFileResource> pendingQueue; @Override @@ -377,6 +379,11 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId)); if (Objects.isNull(dataRegion)) { pendingQueue = new ArrayDeque<>(); + + // Start realtime extractor after historical extractor + if (Objects.nonNull(realtimeExtractorStarter)) { + realtimeExtractorStarter.run(); + } return; } @@ -479,6 +486,11 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa resourceList.size(), originalSequenceTsFileCount + originalUnsequenceTsFileCount, System.currentTimeMillis() - startHistoricalExtractionTime); + + // Start realtime extractor after historical extractor + if (Objects.nonNull(realtimeExtractorStarter)) { + realtimeExtractorStarter.run(); + } } finally { tsFileManager.readUnlock(); } @@ -616,6 +628,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa return event; } + @Override public synchronized boolean hasConsumedAll() { // If the pendingQueue is null when the function is called, it // implies that the extractor only extracts deletion thus the @@ -630,6 +643,11 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa return Objects.nonNull(pendingQueue) ? pendingQueue.size() : 0; } + @Override + public void assignRealtimeExtractorStarter(final Runnable realtimeExtractorStarter) { + this.realtimeExtractorStarter = realtimeExtractorStarter; + } + @Override public synchronized void close() { if (Objects.nonNull(pendingQueue)) {
