This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5debd8a757a [IOTDB-6202] Pipe: listen to tsfile load when realtime
mode = forced-log (#11369)
5debd8a757a is described below
commit 5debd8a757a2244ea5632079c24370e0de197b73
Author: Caideyipi <[email protected]>
AuthorDate: Tue Oct 24 13:14:01 2023 +0800
[IOTDB-6202] Pipe: listen to tsfile load when realtime mode = forced-log
(#11369)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../common/tsfile/PipeTsFileInsertionEvent.java | 22 +++++++++--
.../event/realtime/PipeRealtimeEventFactory.java | 4 +-
.../PipeHistoricalDataRegionTsFileExtractor.java | 1 +
.../PipeRealtimeDataRegionLogExtractor.java | 45 +++++++++++++++++++---
.../listener/PipeInsertionDataNodeListener.java | 7 +++-
.../db/storageengine/dataregion/DataRegion.java | 2 +-
.../dataregion/memtable/TsFileProcessor.java | 2 +-
.../db/pipe/extractor/PipeRealtimeExtractTest.java | 2 +-
8 files changed, 69 insertions(+), 16 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index e8896746ba3..8914e56cbfa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -49,18 +49,20 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
private final TsFileResource resource;
private File tsFile;
+ private final boolean isLoaded;
private final boolean isGeneratedByPipe;
private final AtomicBoolean isClosed;
-
private TsFileInsertionDataContainer dataContainer;
- public PipeTsFileInsertionEvent(TsFileResource resource, boolean
isGeneratedByPipe) {
- this(resource, isGeneratedByPipe, null, null, Long.MIN_VALUE,
Long.MAX_VALUE, false);
+ public PipeTsFileInsertionEvent(
+ TsFileResource resource, boolean isLoaded, boolean isGeneratedByPipe) {
+ this(resource, isLoaded, isGeneratedByPipe, null, null, Long.MIN_VALUE,
Long.MAX_VALUE, false);
}
public PipeTsFileInsertionEvent(
TsFileResource resource,
+ boolean isLoaded,
boolean isGeneratedByPipe,
PipeTaskMeta pipeTaskMeta,
String pattern,
@@ -80,6 +82,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
this.resource = resource;
tsFile = resource.getTsFile();
+ this.isLoaded = isLoaded;
this.isGeneratedByPipe = isGeneratedByPipe;
isClosed = new AtomicBoolean(resource.isClosed());
@@ -114,6 +117,10 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
return tsFile;
}
+ public boolean getIsLoaded() {
+ return isLoaded;
+ }
+
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
@@ -164,7 +171,14 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
public PipeTsFileInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
PipeTaskMeta pipeTaskMeta, String pattern) {
return new PipeTsFileInsertionEvent(
- resource, isGeneratedByPipe, pipeTaskMeta, pattern, startTime,
endTime, needParseTime);
+ resource,
+ isLoaded,
+ isGeneratedByPipe,
+ pipeTaskMeta,
+ pattern,
+ startTime,
+ endTime,
+ needParseTime);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 3a95f9ccf26..97c4bb8eade 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -32,9 +32,9 @@ public class PipeRealtimeEventFactory {
private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new
TsFileEpochManager();
public static PipeRealtimeEvent createRealtimeEvent(
- TsFileResource resource, boolean isGeneratedByPipe) {
+ TsFileResource resource, boolean isLoaded, boolean isGeneratedByPipe) {
return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
- new PipeTsFileInsertionEvent(resource, isGeneratedByPipe), resource);
+ new PipeTsFileInsertionEvent(resource, isLoaded, isGeneratedByPipe),
resource);
}
public static PipeRealtimeEvent createRealtimeEvent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 80e22d8c220..1bd362166d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -284,6 +284,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
new PipeTsFileInsertionEvent(
resource,
false,
+ false,
pipeTaskMeta,
pattern,
historicalDataExtractionStartTime,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index bb796ec380a..c309bd2c620 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -22,10 +22,12 @@ package org.apache.iotdb.db.pipe.extractor.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,25 +39,57 @@ public class PipeRealtimeDataRegionLogExtractor extends
PipeRealtimeDataRegionEx
@Override
protected void doExtract(PipeRealtimeEvent event) {
- if (event.getEvent() instanceof PipeHeartbeatEvent) {
+ final Event eventToExtract = event.getEvent();
+
+ if (eventToExtract instanceof TabletInsertionEvent) {
+ extractTabletInsertion(event);
+ } else if (eventToExtract instanceof TsFileInsertionEvent) {
+ extractTsFileInsertion(event);
+ } else if (eventToExtract instanceof PipeHeartbeatEvent) {
extractHeartbeat(event);
- return;
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported event type %s for hybrid realtime extractor %s",
+ eventToExtract.getClass(), this));
}
+ }
+ private void extractTabletInsertion(PipeRealtimeEvent event) {
event.getTsFileEpoch().migrateState(this, state ->
TsFileEpoch.State.USING_TABLET);
- if (!(event.getEvent() instanceof TabletInsertionEvent)) {
+ if (!pendingQueue.waitedOffer(event)) {
+ // this would not happen, but just in case.
+ // pendingQueue is unbounded, so it should never reach capacity.
+ final String errorMessage =
+ String.format(
+ "extract: pending queue of PipeRealtimeDataRegionLogExtractor %s
"
+ + "has reached capacity, discard tablet event %s, current
state %s",
+ this, event, event.getTsFileEpoch().getState(this));
+ LOGGER.error(errorMessage);
+ PipeAgent.runtime().report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
+
+ // ignore this event.
+
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(),
false);
+ }
+ }
+
+ private void extractTsFileInsertion(PipeRealtimeEvent event) {
+ if (!((PipeTsFileInsertionEvent) event.getEvent()).getIsLoaded()) {
+ // only loaded tsfile can be extracted by this extractor. Ignore this
event.
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(),
false);
return;
}
+ event.getTsFileEpoch().migrateState(this, state ->
TsFileEpoch.State.USING_TSFILE);
+
if (!pendingQueue.waitedOffer(event)) {
// this would not happen, but just in case.
// pendingQueue is unbounded, so it should never reach capacity.
final String errorMessage =
String.format(
"extract: pending queue of PipeRealtimeDataRegionLogExtractor %s
"
- + "has reached capacity, discard tablet event %s, current
state %s",
+ + "has reached capacity, discard loaded tsFile event %s,
current state %s",
this, event, event.getTsFileEpoch().getState(this));
LOGGER.error(errorMessage);
PipeAgent.runtime().report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
@@ -100,7 +134,8 @@ public class PipeRealtimeDataRegionLogExtractor extends
PipeRealtimeDataRegionEx
@Override
public boolean isNeedListenToTsFile() {
- return false;
+ // Only listen to loaded tsFiles
+ return true;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index 5bee5216451..f65cd12a6ef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -92,7 +92,10 @@ public class PipeInsertionDataNodeListener {
//////////////////////////// listen to events ////////////////////////////
public void listenToTsFile(
- String dataRegionId, TsFileResource tsFileResource, boolean
isGeneratedByPipe) {
+ String dataRegionId,
+ TsFileResource tsFileResource,
+ boolean isLoaded,
+ boolean isGeneratedByPipe) {
// We don't judge whether listenToTsFileExtractorCount.get() == 0 here on
purpose
// because extractors may use tsfile events when some exceptions occur in
the
// insert nodes listening process.
@@ -105,7 +108,7 @@ public class PipeInsertionDataNodeListener {
}
assigner.publishToAssign(
- PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource,
isGeneratedByPipe));
+ PipeRealtimeEventFactory.createRealtimeEvent(tsFileResource, isLoaded,
isGeneratedByPipe));
}
public void listenToInsertNode(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 5723fd62cc5..8bce9f4f6ee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2205,7 +2205,7 @@ public class DataRegion implements IDataRegionForQuery {
tsfileToBeInserted, newTsFileResource, newFilePartitionId,
deleteOriginFile);
PipeInsertionDataNodeListener.getInstance()
- .listenToTsFile(dataRegionId, newTsFileResource, isGeneratedByPipe);
+ .listenToTsFile(dataRegionId, newTsFileResource, true,
isGeneratedByPipe);
FileMetrics.getInstance()
.addTsFile(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 6f0e6966b06..1363128e181 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -900,7 +900,7 @@ public class TsFileProcessor {
try {
PipeInsertionDataNodeListener.getInstance()
.listenToTsFile(
- dataRegionInfo.getDataRegion().getDataRegionId(),
tsFileResource, false);
+ dataRegionInfo.getDataRegion().getDataRegionId(),
tsFileResource, false, false);
// When invoke closing TsFile after insert data to memTable, we
shouldn't flush until invoke
// flushing memTable in System module.
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
index 1900ff539bb..50da3cfc54a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
@@ -287,7 +287,7 @@ public class PipeRealtimeExtractTest {
false),
resource);
PipeInsertionDataNodeListener.getInstance()
- .listenToTsFile(dataRegionId, resource, false);
+ .listenToTsFile(dataRegionId, resource, false, false);
}
});
}