This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c613c508ff2ea2790dbb86ae70228ecf28fe3716
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jul 22 18:19:28 2025 +0800

    Pipe: Do not transfer historical tsFiles when restarts in realtime-only 
mode (#15996)
    
    (cherry picked from commit 37ce6d17ddb60ad44fbf99a05e8b1c8c45185161)
---
 .../dataregion/IoTDBDataRegionExtractor.java       | 46 +++---------
 ...oricalDataRegionTsFileAndDeletionExtractor.java | 84 +++-------------------
 2 files changed, 21 insertions(+), 109 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index 8b46c1e31f7..aa89dd28d81 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -55,8 +55,6 @@ import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.util.Arrays;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
@@ -132,7 +130,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBDataRegionExtractor.class);
 
-  private @Nullable PipeHistoricalDataRegionExtractor historicalExtractor;
+  private PipeHistoricalDataRegionExtractor historicalExtractor;
   private PipeRealtimeDataRegionExtractor realtimeExtractor;
 
   private DataRegionWatermarkInjector watermarkInjector;
@@ -297,22 +295,10 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
 
     checkInvalidParameters(validator);
 
-    if (validator
-            .getParameters()
-            .getBooleanOrDefault(SystemConstant.RESTART_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
-        || validator
-            .getParameters()
-            .getBooleanOrDefault(
-                Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
-                EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE)) {
-      // Do not flush or open historical extractor when historical tsFile is 
disabled
-      constructHistoricalExtractor();
-    }
+    constructHistoricalExtractor();
     constructRealtimeExtractor(validator.getParameters());
 
-    if (Objects.nonNull(historicalExtractor)) {
-      historicalExtractor.validate(validator);
-    }
+    historicalExtractor.validate(validator);
     realtimeExtractor.validate(validator);
   }
 
@@ -536,9 +522,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
 
     super.customize(parameters, configuration);
 
-    if (Objects.nonNull(historicalExtractor)) {
-      historicalExtractor.customize(parameters, configuration);
-    }
+    historicalExtractor.customize(parameters, configuration);
     realtimeExtractor.customize(parameters, configuration);
 
     // Set watermark injector
@@ -582,9 +566,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
         "Pipe {}@{}: Starting historical extractor {} and realtime extractor 
{}.",
         pipeName,
         regionId,
-        Objects.nonNull(historicalExtractor)
-            ? historicalExtractor.getClass().getSimpleName()
-            : null,
+        historicalExtractor.getClass().getSimpleName(),
         realtimeExtractor.getClass().getSimpleName());
 
     super.start();
@@ -619,9 +601,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
             "Pipe {}@{}: Started historical extractor {} and realtime 
extractor {} successfully within {} ms.",
             pipeName,
             regionId,
-            Objects.nonNull(historicalExtractor)
-                ? historicalExtractor.getClass().getSimpleName()
-                : null,
+            historicalExtractor.getClass().getSimpleName(),
             realtimeExtractor.getClass().getSimpleName(),
             System.currentTimeMillis() - startTime);
         return;
@@ -639,18 +619,14 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
       // There can still be writing when tsFile events are added. If we start
       // realtimeExtractor after the process, then this part of data will be 
lost.
       realtimeExtractor.start();
-      if (Objects.nonNull(historicalExtractor)) {
-        historicalExtractor.start();
-      }
+      historicalExtractor.start();
     } catch (final Exception e) {
       exceptionHolder.set(e);
       LOGGER.warn(
           "Pipe {}@{}: Start historical extractor {} and realtime extractor {} 
error.",
           pipeName,
           regionId,
-          Objects.nonNull(historicalExtractor)
-              ? historicalExtractor.getClass().getSimpleName()
-              : null,
+          historicalExtractor.getClass().getSimpleName(),
           realtimeExtractor.getClass().getSimpleName(),
           e);
     }
@@ -669,7 +645,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
     }
 
     Event event = null;
-    if (Objects.nonNull(historicalExtractor) && 
!historicalExtractor.hasConsumedAll()) {
+    if (!historicalExtractor.hasConsumedAll()) {
       event = historicalExtractor.supply();
     } else {
       if (Objects.nonNull(watermarkInjector)) {
@@ -699,9 +675,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
       return;
     }
 
-    if (Objects.nonNull(historicalExtractor)) {
-      historicalExtractor.close();
-    }
+    historicalExtractor.close();
     realtimeExtractor.close();
     if (Objects.nonNull(taskID)) {
       PipeDataRegionExtractorMetrics.getInstance().deregister(taskID);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
index bdda4117432..7b765c143a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
@@ -53,7 +53,6 @@ import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -141,7 +140,6 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
   private boolean isHistoricalExtractorEnabled = false;
   private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event 
time
   private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time
-  private long historicalDataExtractionTimeLowerBound; // Arrival time
 
   private boolean sloppyTimeRange; // true to disable time range filter after 
extraction
   private boolean sloppyPattern; // true to disable pattern filter after 
extraction
@@ -263,17 +261,14 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
 
     try {
       historicalDataExtractionStartTime =
-          isHistoricalExtractorEnabled
-                  && parameters.hasAnyAttributes(
-                      EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY)
+          parameters.hasAnyAttributes(
+                  EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY)
               ? 
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
                   parameters.getStringByKeys(
                       EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY))
               : Long.MIN_VALUE;
       historicalDataExtractionEndTime =
-          isHistoricalExtractorEnabled
-                  && parameters.hasAnyAttributes(
-                      EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY)
+          parameters.hasAnyAttributes(EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY)
               ? 
DateTimeUtils.convertTimestampOrDatetimeStrToLongWithDefaultZone(
                   parameters.getStringByKeys(
                       EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY))
@@ -342,46 +337,6 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
       }
     }
 
-    // Enable historical extractor by default
-    historicalDataExtractionTimeLowerBound =
-        isHistoricalExtractorEnabled
-            ? Long.MIN_VALUE
-            // We define the realtime data as the data generated after the 
creation time
-            // of the pipe from user's perspective. But we still need to use
-            // PipeHistoricalDataRegionExtractor to extract the realtime data 
generated between the
-            // creation time of the pipe and the time when the pipe starts, 
because those data
-            // can not be listened by PipeRealtimeDataRegionExtractor, and 
should be extracted by
-            // PipeHistoricalDataRegionExtractor from implementation 
perspective.
-            : environment.getCreationTime();
-
-    // Only invoke flushDataRegionAllTsFiles() when the pipe runs in the 
realtime only mode.
-    // realtime only mode -> (historicalDataExtractionTimeLowerBound != 
Long.MIN_VALUE)
-    //
-    // Ensure that all data in the data region is flushed to disk before 
extracting data.
-    // This ensures the generation time of all newly generated TsFiles 
(realtime data) after the
-    // invocation of flushDataRegionAllTsFiles() is later than the 
creationTime of the pipe
-    // (historicalDataExtractionTimeLowerBound).
-    //
-    // Note that: the generation time of the TsFile is the time when the 
TsFile is created, not
-    // the time when the data is flushed to the TsFile.
-    //
-    // Then we can use the generation time of the TsFile to determine whether 
the data in the
-    // TsFile should be extracted by comparing the generation time of the 
TsFile with the
-    // historicalDataExtractionTimeLowerBound when starting the pipe in 
realtime only mode.
-    //
-    // If we don't invoke flushDataRegionAllTsFiles() in the realtime only 
mode, the data generated
-    // between the creation time of the pipe the time when the pipe starts 
will be lost.
-    if (historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) {
-      synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
-        final long lastFlushedByPipeTime =
-            DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
-        if (System.currentTimeMillis() - lastFlushedByPipeTime >= 
PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
-          flushDataRegionAllTsFiles();
-          DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, 
System.currentTimeMillis());
-        }
-      }
-    }
-
     if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) {
       shouldTransferModFile =
           parameters.getBooleanOrDefault(
@@ -597,8 +552,10 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
               .peek(originalResourceList::add)
               .filter(
                   resource ->
-                      // Some resource is marked as deleted but not removed 
from the list.
-                      !resource.isDeleted()
+                      isHistoricalExtractorEnabled
+                          &&
+                          // Some resource is marked as deleted but not 
removed from the list.
+                          !resource.isDeleted()
                           // Some resource is generated by pipe. We ignore 
them if the pipe should
                           // not transfer pipe requests.
                           && (!resource.isGeneratedByPipe() || 
isForwardingPipeRequests)
@@ -608,7 +565,6 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
                           !resource.isClosed()
                               || mayTsFileContainUnprocessedData(resource)
                                   && 
isTsFileResourceOverlappedWithTimeRange(resource)
-                                  && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
                                   && 
mayTsFileResourceOverlappedWithPattern(resource)))
               .collect(Collectors.toList());
       filteredTsFileResources.addAll(sequenceTsFileResources);
@@ -618,8 +574,10 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
               .peek(originalResourceList::add)
               .filter(
                   resource ->
-                      // Some resource is marked as deleted but not removed 
from the list.
-                      !resource.isDeleted()
+                      isHistoricalExtractorEnabled
+                          &&
+                          // Some resource is marked as deleted but not 
removed from the list.
+                          !resource.isDeleted()
                           // Some resource is generated by pipe. We ignore 
them if the pipe should
                           // not transfer pipe requests.
                           && (!resource.isGeneratedByPipe() || 
isForwardingPipeRequests)
@@ -629,7 +587,6 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
                           !resource.isClosed()
                               || mayTsFileContainUnprocessedData(resource)
                                   && 
isTsFileResourceOverlappedWithTimeRange(resource)
-                                  && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
                                   && 
mayTsFileResourceOverlappedWithPattern(resource)))
               .collect(Collectors.toList());
       filteredTsFileResources.addAll(unsequenceTsFileResources);
@@ -759,25 +716,6 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
         && historicalDataExtractionEndTime >= resource.getFileEndTime();
   }
 
-  private boolean isTsFileGeneratedAfterExtractionTimeLowerBound(final 
TsFileResource resource) {
-    try {
-      return historicalDataExtractionTimeLowerBound
-          <= 
TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime();
-    } catch (final IOException e) {
-      LOGGER.warn(
-          "Pipe {}@{}: failed to get the generation time of TsFile {}, extract 
it anyway"
-              + " (historical data extraction time lower bound: {})",
-          pipeName,
-          dataRegionId,
-          resource.getTsFilePath(),
-          historicalDataExtractionTimeLowerBound,
-          e);
-      // If failed to get the generation time of the TsFile, we will extract 
the data in the TsFile
-      // anyway.
-      return true;
-    }
-  }
-
   private void extractDeletions(
       final DeletionResourceManager deletionResourceManager,
       final List<PersistentResource> resourceList) {

Reply via email to