This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a67b0c02b6e Pipe: fixed potential lose point bug caused by cancelled
flush of historical extractor (#12056)
a67b0c02b6e is described below
commit a67b0c02b6edae0916498869ed33dc04cb1458b3
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 21 14:16:50 2024 +0800
Pipe: fixed potential lose point bug caused by cancelled flush of
historical extractor (#12056)
The data loss situation can be caused by the following operations:
1. PipeA: start historical data extraction with flush
2. Data insertion
3. PipeB: start realtime data extraction
4. PipeB: start historical data extraction without flush
5. Data inserted in the step2 is not captured by PipeB, and if its tsfile
epoch's state is USING_TABLET, the tsfile event will be ignored, which will
cause the data loss in the tsfile epoch.
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../common/tsfile/PipeTsFileInsertionEvent.java | 4 ++++
.../PipeRealtimeDataRegionHybridExtractor.java | 22 +++++++++++++++++++++-
.../PipeRealtimeDataRegionLogExtractor.java | 12 +++++++++---
.../pipe/extractor/realtime/epoch/TsFileEpoch.java | 14 ++++++++++++++
.../realtime/epoch/TsFileEpochManager.java | 5 ++++-
5 files changed, 52 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 532ca5a1a7e..412e4143a03 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -116,6 +116,10 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
return isLoaded;
}
+ public long getFileStartTime() {
+ return resource.getFileStartTime();
+ }
+
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 568c96b5263..63a4098cff9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
@@ -138,7 +139,26 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
case USING_TSFILE:
return TsFileEpoch.State.USING_TSFILE;
case USING_TABLET:
- return TsFileEpoch.State.USING_TABLET;
+ if (((PipeTsFileInsertionEvent)
event.getEvent()).getFileStartTime()
+ < event.getTsFileEpoch().getInsertNodeMinTime()) {
+ // Some insert nodes in the tsfile epoch are not captured
by pipe, so we should
+ // capture the tsfile event to make sure all data in the
tsfile epoch can be
+ // extracted.
+ //
+ // The situation can be caused by the following operations:
+ // 1. PipeA: start historical data extraction with flush
+ // 2. Data insertion
+ // 3. PipeB: start realtime data extraction
+ // 4. PipeB: start historical data extraction without
flush
+ // 5. Data inserted in the step2 is not captured by
PipeB, and if its tsfile
+ // epoch's state is USING_TABLET, the tsfile event
will be ignored, which
+ // will cause the data loss in the tsfile epoch.
+ return TsFileEpoch.State.USING_BOTH;
+ } else {
+ // All data in the tsfile epoch has been extracted in
tablet mode, so we should
+ // simply keep the state of the tsfile epoch and discard
the tsfile event.
+ return TsFileEpoch.State.USING_TABLET;
+ }
case USING_BOTH:
default:
return TsFileEpoch.State.USING_BOTH;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index 2d857bcad9b..68f21ea283a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -75,8 +75,14 @@ public class PipeRealtimeDataRegionLogExtractor extends
PipeRealtimeDataRegionEx
}
private void extractTsFileInsertion(PipeRealtimeEvent event) {
- if (!((PipeTsFileInsertionEvent) event.getEvent()).getIsLoaded()) {
- // only loaded tsfile can be extracted by this extractor. Ignore this
event.
+ final PipeTsFileInsertionEvent tsFileInsertionEvent =
+ (PipeTsFileInsertionEvent) event.getEvent();
+ if (!(tsFileInsertionEvent.getIsLoaded()
+ // some insert nodes in the tsfile epoch are not captured by pipe
+ || tsFileInsertionEvent.getFileStartTime()
+ < event.getTsFileEpoch().getInsertNodeMinTime())) {
+ // All data in the tsfile epoch has been extracted in tablet mode, so we
should
+ // simply ignore this event.
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(),
false);
return;
}
@@ -134,7 +140,7 @@ public class PipeRealtimeDataRegionLogExtractor extends
PipeRealtimeDataRegionEx
@Override
public boolean isNeedListenToTsFile() {
- // Only listen to loaded tsFiles
+ // Only listen to tsFiles that can't be represented by insertNodes
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
index b0ec157f016..1ccc4e979dc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class TsFileEpoch {
@@ -31,10 +32,12 @@ public class TsFileEpoch {
private final String filePath;
private final ConcurrentMap<PipeRealtimeDataRegionExtractor,
AtomicReference<State>>
dataRegionExtractor2State;
+ private final AtomicLong insertNodeMinTime;
public TsFileEpoch(String filePath) {
this.filePath = filePath;
this.dataRegionExtractor2State = new ConcurrentHashMap<>();
+ this.insertNodeMinTime = new AtomicLong(Long.MAX_VALUE);
}
public TsFileEpoch.State getState(PipeRealtimeDataRegionExtractor extractor)
{
@@ -57,6 +60,14 @@ public class TsFileEpoch {
.setRecentProcessedTsFileEpochState(extractor.getTaskID(),
state.get()));
}
+ public void updateInsertNodeMinTime(long newComingMinTime) {
+ insertNodeMinTime.updateAndGet(recordedMinTime ->
Math.min(recordedMinTime, newComingMinTime));
+ }
+
+ public long getInsertNodeMinTime() {
+ return insertNodeMinTime.get();
+ }
+
@Override
public String toString() {
return "TsFileEpoch{"
@@ -65,6 +76,9 @@ public class TsFileEpoch {
+ '\''
+ ", dataRegionExtractor2State="
+ dataRegionExtractor2State
+ + '\''
+ + ", insertNodeMinTime="
+ + insertNodeMinTime.get()
+ '}';
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
index fdd2ad85c62..a1ad2f49ec9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
@@ -69,9 +69,12 @@ public class TsFileEpochManager {
public PipeRealtimeEvent bindPipeInsertNodeTabletInsertionEvent(
PipeInsertNodeTabletInsertionEvent event, InsertNode node,
TsFileResource resource) {
+ final TsFileEpoch epoch =
+ filePath2Epoch.computeIfAbsent(resource.getTsFilePath(),
TsFileEpoch::new);
+ epoch.updateInsertNodeMinTime(node.getMinTime());
return new PipeRealtimeEvent(
event,
- filePath2Epoch.computeIfAbsent(resource.getTsFilePath(),
TsFileEpoch::new),
+ epoch,
Collections.singletonMap(node.getDevicePath().getFullPath(),
node.getMeasurements()),
event.getPattern());
}