This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 37ce6d17ddb Pipe: Do not transfer historical tsFiles when restarts in
realtime-only mode (#15996)
37ce6d17ddb is described below
commit 37ce6d17ddb60ad44fbf99a05e8b1c8c45185161
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jul 22 18:19:28 2025 +0800
Pipe: Do not transfer historical tsFiles when restarts in realtime-only
mode (#15996)
---
.../dataregion/IoTDBDataRegionExtractor.java | 46 +++---------
...oricalDataRegionTsFileAndDeletionExtractor.java | 84 +++-------------------
2 files changed, 21 insertions(+), 109 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index 8b46c1e31f7..aa89dd28d81 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -55,8 +55,6 @@ import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
@@ -132,7 +130,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBDataRegionExtractor.class);
- private @Nullable PipeHistoricalDataRegionExtractor historicalExtractor;
+ private PipeHistoricalDataRegionExtractor historicalExtractor;
private PipeRealtimeDataRegionExtractor realtimeExtractor;
private DataRegionWatermarkInjector watermarkInjector;
@@ -297,22 +295,10 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
checkInvalidParameters(validator);
- if (validator
- .getParameters()
- .getBooleanOrDefault(SystemConstant.RESTART_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
- || validator
- .getParameters()
- .getBooleanOrDefault(
- Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
- EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE)) {
- // Do not flush or open historical extractor when historical tsFile is
disabled
- constructHistoricalExtractor();
- }
+ constructHistoricalExtractor();
constructRealtimeExtractor(validator.getParameters());
- if (Objects.nonNull(historicalExtractor)) {
- historicalExtractor.validate(validator);
- }
+ historicalExtractor.validate(validator);
realtimeExtractor.validate(validator);
}
@@ -536,9 +522,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
super.customize(parameters, configuration);
- if (Objects.nonNull(historicalExtractor)) {
- historicalExtractor.customize(parameters, configuration);
- }
+ historicalExtractor.customize(parameters, configuration);
realtimeExtractor.customize(parameters, configuration);
// Set watermark injector
@@ -582,9 +566,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
"Pipe {}@{}: Starting historical extractor {} and realtime extractor
{}.",
pipeName,
regionId,
- Objects.nonNull(historicalExtractor)
- ? historicalExtractor.getClass().getSimpleName()
- : null,
+ historicalExtractor.getClass().getSimpleName(),
realtimeExtractor.getClass().getSimpleName());
super.start();
@@ -619,9 +601,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
"Pipe {}@{}: Started historical extractor {} and realtime
extractor {} successfully within {} ms.",
pipeName,
regionId,
- Objects.nonNull(historicalExtractor)
- ? historicalExtractor.getClass().getSimpleName()
- : null,
+ historicalExtractor.getClass().getSimpleName(),
realtimeExtractor.getClass().getSimpleName(),
System.currentTimeMillis() - startTime);
return;
@@ -639,18 +619,14 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
// There can still be writing when tsFile events are added. If we start
// realtimeExtractor after the process, then this part of data will be
lost.
realtimeExtractor.start();
- if (Objects.nonNull(historicalExtractor)) {
- historicalExtractor.start();
- }
+ historicalExtractor.start();
} catch (final Exception e) {
exceptionHolder.set(e);
LOGGER.warn(
"Pipe {}@{}: Start historical extractor {} and realtime extractor {}
error.",
pipeName,
regionId,
- Objects.nonNull(historicalExtractor)
- ? historicalExtractor.getClass().getSimpleName()
- : null,
+ historicalExtractor.getClass().getSimpleName(),
realtimeExtractor.getClass().getSimpleName(),
e);
}
@@ -669,7 +645,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
}
Event event = null;
- if (Objects.nonNull(historicalExtractor) &&
!historicalExtractor.hasConsumedAll()) {
+ if (!historicalExtractor.hasConsumedAll()) {
event = historicalExtractor.supply();
} else {
if (Objects.nonNull(watermarkInjector)) {
@@ -699,9 +675,7 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
return;
}
- if (Objects.nonNull(historicalExtractor)) {
- historicalExtractor.close();
- }
+ historicalExtractor.close();
realtimeExtractor.close();
if (Objects.nonNull(taskID)) {
PipeDataRegionExtractorMetrics.getInstance().deregister(taskID);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
index bdda4117432..7b765c143a2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
@@ -53,7 +53,6 @@ import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.db.utils.DateTimeUtils;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -141,7 +140,6 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
private boolean isHistoricalExtractorEnabled = false;
private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event
time
private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time
- private long historicalDataExtractionTimeLowerBound; // Arrival time
private boolean sloppyTimeRange; // true to disable time range filter after
extraction
private boolean sloppyPattern; // true to disable pattern filter after
extraction
@@ -263,17 +261,14 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
try {
historicalDataExtractionStartTime =
- isHistoricalExtractorEnabled
- && parameters.hasAnyAttributes(
- EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY)
+ parameters.hasAnyAttributes(
+ EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY)
?
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
parameters.getStringByKeys(
EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY))
: Long.MIN_VALUE;
historicalDataExtractionEndTime =
- isHistoricalExtractorEnabled
- && parameters.hasAnyAttributes(
- EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY)
+ parameters.hasAnyAttributes(EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY)
?
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
parameters.getStringByKeys(
EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY))
@@ -342,46 +337,6 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
}
}
- // Enable historical extractor by default
- historicalDataExtractionTimeLowerBound =
- isHistoricalExtractorEnabled
- ? Long.MIN_VALUE
- // We define the realtime data as the data generated after the
creation time
- // of the pipe from user's perspective. But we still need to use
- // PipeHistoricalDataRegionExtractor to extract the realtime data
generated between the
- // creation time of the pipe and the time when the pipe starts,
because those data
- // can not be listened by PipeRealtimeDataRegionExtractor, and
should be extracted by
- // PipeHistoricalDataRegionExtractor from implementation
perspective.
- : environment.getCreationTime();
-
- // Only invoke flushDataRegionAllTsFiles() when the pipe runs in the
realtime only mode.
- // realtime only mode -> (historicalDataExtractionTimeLowerBound !=
Long.MIN_VALUE)
- //
- // Ensure that all data in the data region is flushed to disk before
extracting data.
- // This ensures the generation time of all newly generated TsFiles
(realtime data) after the
- // invocation of flushDataRegionAllTsFiles() is later than the
creationTime of the pipe
- // (historicalDataExtractionTimeLowerBound).
- //
- // Note that: the generation time of the TsFile is the time when the
TsFile is created, not
- // the time when the data is flushed to the TsFile.
- //
- // Then we can use the generation time of the TsFile to determine whether
the data in the
- // TsFile should be extracted by comparing the generation time of the
TsFile with the
- // historicalDataExtractionTimeLowerBound when starting the pipe in
realtime only mode.
- //
- // If we don't invoke flushDataRegionAllTsFiles() in the realtime only
mode, the data generated
- // between the creation time of the pipe the time when the pipe starts
will be lost.
- if (historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) {
- synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
- final long lastFlushedByPipeTime =
- DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
- if (System.currentTimeMillis() - lastFlushedByPipeTime >=
PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
- flushDataRegionAllTsFiles();
- DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId,
System.currentTimeMillis());
- }
- }
- }
-
if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) {
shouldTransferModFile =
parameters.getBooleanOrDefault(
@@ -597,8 +552,10 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
.peek(originalResourceList::add)
.filter(
resource ->
- // Some resource is marked as deleted but not removed
from the list.
- !resource.isDeleted()
+ isHistoricalExtractorEnabled
+ &&
+ // Some resource is marked as deleted but not
removed from the list.
+ !resource.isDeleted()
// Some resource is generated by pipe. We ignore
them if the pipe should
// not transfer pipe requests.
&& (!resource.isGeneratedByPipe() ||
isForwardingPipeRequests)
@@ -608,7 +565,6 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
!resource.isClosed()
|| mayTsFileContainUnprocessedData(resource)
&&
isTsFileResourceOverlappedWithTimeRange(resource)
- &&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
&&
mayTsFileResourceOverlappedWithPattern(resource)))
.collect(Collectors.toList());
filteredTsFileResources.addAll(sequenceTsFileResources);
@@ -618,8 +574,10 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
.peek(originalResourceList::add)
.filter(
resource ->
- // Some resource is marked as deleted but not removed
from the list.
- !resource.isDeleted()
+ isHistoricalExtractorEnabled
+ &&
+ // Some resource is marked as deleted but not
removed from the list.
+ !resource.isDeleted()
// Some resource is generated by pipe. We ignore
them if the pipe should
// not transfer pipe requests.
&& (!resource.isGeneratedByPipe() ||
isForwardingPipeRequests)
@@ -629,7 +587,6 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
!resource.isClosed()
|| mayTsFileContainUnprocessedData(resource)
&&
isTsFileResourceOverlappedWithTimeRange(resource)
- &&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
&&
mayTsFileResourceOverlappedWithPattern(resource)))
.collect(Collectors.toList());
filteredTsFileResources.addAll(unsequenceTsFileResources);
@@ -759,25 +716,6 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
&& historicalDataExtractionEndTime >= resource.getFileEndTime();
}
- private boolean isTsFileGeneratedAfterExtractionTimeLowerBound(final
TsFileResource resource) {
- try {
- return historicalDataExtractionTimeLowerBound
- <=
TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime();
- } catch (final IOException e) {
- LOGGER.warn(
- "Pipe {}@{}: failed to get the generation time of TsFile {}, extract
it anyway"
- + " (historical data extraction time lower bound: {})",
- pipeName,
- dataRegionId,
- resource.getTsFilePath(),
- historicalDataExtractionTimeLowerBound,
- e);
- // If failed to get the generation time of the TsFile, we will extract
the data in the TsFile
- // anyway.
- return true;
- }
- }
-
private void extractDeletions(
final DeletionResourceManager deletionResourceManager,
final List<PersistentResource> resourceList) {