This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c613c508ff2ea2790dbb86ae70228ecf28fe3716 Author: Caideyipi <[email protected]> AuthorDate: Tue Jul 22 18:19:28 2025 +0800 Pipe: Do not transfer historical tsFiles when restarts in realtime-only mode (#15996) (cherry picked from commit 37ce6d17ddb60ad44fbf99a05e8b1c8c45185161) --- .../dataregion/IoTDBDataRegionExtractor.java | 46 +++--------- ...oricalDataRegionTsFileAndDeletionExtractor.java | 84 +++------------------- 2 files changed, 21 insertions(+), 109 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index 8b46c1e31f7..aa89dd28d81 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -55,8 +55,6 @@ import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.util.Arrays; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; @@ -132,7 +130,7 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionExtractor.class); - private @Nullable PipeHistoricalDataRegionExtractor historicalExtractor; + private PipeHistoricalDataRegionExtractor historicalExtractor; private PipeRealtimeDataRegionExtractor realtimeExtractor; private DataRegionWatermarkInjector watermarkInjector; @@ -297,22 +295,10 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { checkInvalidParameters(validator); - if (validator - .getParameters() - .getBooleanOrDefault(SystemConstant.RESTART_KEY, SystemConstant.RESTART_DEFAULT_VALUE) - || validator - .getParameters() - .getBooleanOrDefault( - Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY), - EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE)) { - // Do not flush or open historical extractor when historical tsFile is disabled - constructHistoricalExtractor(); - } + constructHistoricalExtractor(); constructRealtimeExtractor(validator.getParameters()); - if (Objects.nonNull(historicalExtractor)) { - historicalExtractor.validate(validator); - } + historicalExtractor.validate(validator); realtimeExtractor.validate(validator); } @@ -536,9 +522,7 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { super.customize(parameters, configuration); - if (Objects.nonNull(historicalExtractor)) { - historicalExtractor.customize(parameters, configuration); - } + historicalExtractor.customize(parameters, configuration); realtimeExtractor.customize(parameters, configuration); // Set watermark injector @@ -582,9 +566,7 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { "Pipe {}@{}: Starting historical extractor {} and realtime extractor {}.", pipeName, regionId, - Objects.nonNull(historicalExtractor) - ? historicalExtractor.getClass().getSimpleName() - : null, + historicalExtractor.getClass().getSimpleName(), realtimeExtractor.getClass().getSimpleName()); super.start(); @@ -619,9 +601,7 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { "Pipe {}@{}: Started historical extractor {} and realtime extractor {} successfully within {} ms.", pipeName, regionId, - Objects.nonNull(historicalExtractor) - ? historicalExtractor.getClass().getSimpleName() - : null, + historicalExtractor.getClass().getSimpleName(), realtimeExtractor.getClass().getSimpleName(), System.currentTimeMillis() - startTime); return; @@ -639,18 +619,14 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { // There can still be writing when tsFile events are added. If we start // realtimeExtractor after the process, then this part of data will be lost. realtimeExtractor.start(); - if (Objects.nonNull(historicalExtractor)) { - historicalExtractor.start(); - } + historicalExtractor.start(); } catch (final Exception e) { exceptionHolder.set(e); LOGGER.warn( "Pipe {}@{}: Start historical extractor {} and realtime extractor {} error.", pipeName, regionId, - Objects.nonNull(historicalExtractor) - ? historicalExtractor.getClass().getSimpleName() - : null, + historicalExtractor.getClass().getSimpleName(), realtimeExtractor.getClass().getSimpleName(), e); } @@ -669,7 +645,7 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { } Event event = null; - if (Objects.nonNull(historicalExtractor) && !historicalExtractor.hasConsumedAll()) { + if (!historicalExtractor.hasConsumedAll()) { event = historicalExtractor.supply(); } else { if (Objects.nonNull(watermarkInjector)) { @@ -699,9 +675,7 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { return; } - if (Objects.nonNull(historicalExtractor)) { - historicalExtractor.close(); - } + historicalExtractor.close(); realtimeExtractor.close(); if (Objects.nonNull(taskID)) { PipeDataRegionExtractorMetrics.getInstance().deregister(taskID); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java index bdda4117432..7b765c143a2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java @@ -53,7 +53,6 @@ import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; @@ -141,7 +140,6 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor private boolean isHistoricalExtractorEnabled = false; private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event time private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time - private long historicalDataExtractionTimeLowerBound; // Arrival time private boolean sloppyTimeRange; // true to disable time range filter after extraction private boolean sloppyPattern; // true to disable pattern filter after extraction @@ -263,17 +261,14 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor try { historicalDataExtractionStartTime = - isHistoricalExtractorEnabled - && parameters.hasAnyAttributes( - EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY) + parameters.hasAnyAttributes( + EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY) ? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone( parameters.getStringByKeys( EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY)) : Long.MIN_VALUE; historicalDataExtractionEndTime = - isHistoricalExtractorEnabled - && parameters.hasAnyAttributes( - EXTRACTOR_HISTORY_END_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY) + parameters.hasAnyAttributes(EXTRACTOR_HISTORY_END_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY) ? DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone( parameters.getStringByKeys( EXTRACTOR_HISTORY_END_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY)) @@ -342,46 +337,6 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor } } - // Enable historical extractor by default - historicalDataExtractionTimeLowerBound = - isHistoricalExtractorEnabled - ? Long.MIN_VALUE - // We define the realtime data as the data generated after the creation time - // of the pipe from user's perspective. But we still need to use - // PipeHistoricalDataRegionExtractor to extract the realtime data generated between the - // creation time of the pipe and the time when the pipe starts, because those data - // can not be listened by PipeRealtimeDataRegionExtractor, and should be extracted by - // PipeHistoricalDataRegionExtractor from implementation perspective. - : environment.getCreationTime(); - - // Only invoke flushDataRegionAllTsFiles() when the pipe runs in the realtime only mode. - // realtime only mode -> (historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) - // - // Ensure that all data in the data region is flushed to disk before extracting data. - // This ensures the generation time of all newly generated TsFiles (realtime data) after the - // invocation of flushDataRegionAllTsFiles() is later than the creationTime of the pipe - // (historicalDataExtractionTimeLowerBound). - // - // Note that: the generation time of the TsFile is the time when the TsFile is created, not - // the time when the data is flushed to the TsFile. - // - // Then we can use the generation time of the TsFile to determine whether the data in the - // TsFile should be extracted by comparing the generation time of the TsFile with the - // historicalDataExtractionTimeLowerBound when starting the pipe in realtime only mode. - // - // If we don't invoke flushDataRegionAllTsFiles() in the realtime only mode, the data generated - // between the creation time of the pipe the time when the pipe starts will be lost. - if (historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) { - synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) { - final long lastFlushedByPipeTime = - DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId); - if (System.currentTimeMillis() - lastFlushedByPipeTime >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) { - flushDataRegionAllTsFiles(); - DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis()); - } - } - } - if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) { shouldTransferModFile = parameters.getBooleanOrDefault( @@ -597,8 +552,10 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor .peek(originalResourceList::add) .filter( resource -> - // Some resource is marked as deleted but not removed from the list. - !resource.isDeleted() + isHistoricalExtractorEnabled + && + // Some resource is marked as deleted but not removed from the list. + !resource.isDeleted() // Some resource is generated by pipe. We ignore them if the pipe should // not transfer pipe requests. && (!resource.isGeneratedByPipe() || isForwardingPipeRequests) @@ -608,7 +565,6 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor !resource.isClosed() || mayTsFileContainUnprocessedData(resource) && isTsFileResourceOverlappedWithTimeRange(resource) - && isTsFileGeneratedAfterExtractionTimeLowerBound(resource) && mayTsFileResourceOverlappedWithPattern(resource))) .collect(Collectors.toList()); filteredTsFileResources.addAll(sequenceTsFileResources); @@ -618,8 +574,10 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor .peek(originalResourceList::add) .filter( resource -> - // Some resource is marked as deleted but not removed from the list. - !resource.isDeleted() + isHistoricalExtractorEnabled + && + // Some resource is marked as deleted but not removed from the list. + !resource.isDeleted() // Some resource is generated by pipe. We ignore them if the pipe should // not transfer pipe requests. && (!resource.isGeneratedByPipe() || isForwardingPipeRequests) @@ -629,7 +587,6 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor !resource.isClosed() || mayTsFileContainUnprocessedData(resource) && isTsFileResourceOverlappedWithTimeRange(resource) - && isTsFileGeneratedAfterExtractionTimeLowerBound(resource) && mayTsFileResourceOverlappedWithPattern(resource))) .collect(Collectors.toList()); filteredTsFileResources.addAll(unsequenceTsFileResources); @@ -759,25 +716,6 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor && historicalDataExtractionEndTime >= resource.getFileEndTime(); } - private boolean isTsFileGeneratedAfterExtractionTimeLowerBound(final TsFileResource resource) { - try { - return historicalDataExtractionTimeLowerBound - <= TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime(); - } catch (final IOException e) { - LOGGER.warn( - "Pipe {}@{}: failed to get the generation time of TsFile {}, extract it anyway" - + " (historical data extraction time lower bound: {})", - pipeName, - dataRegionId, - resource.getTsFilePath(), - historicalDataExtractionTimeLowerBound, - e); - // If failed to get the generation time of the TsFile, we will extract the data in the TsFile - // anyway. - return true; - } - } - private void extractDeletions( final DeletionResourceManager deletionResourceManager, final List<PersistentResource> resourceList) {
