This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b33278688c2 Pipe: improve progress coverage checks (#17940)
b33278688c2 is described below

commit b33278688c2fc5f8c6f860253204b4e835f16487
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 17:11:26 2026 +0800

    Pipe: improve progress coverage checks (#17940)
    
    * Pipe: improve progress coverage checks
    
    * Pipe: address shutdown progress review comments
    
    * Pipe: refine Chinese shutdown progress messages
---
 .../apache/iotdb/db/i18n/DataNodePipeMessages.java |  39 ++++++
 .../apache/iotdb/db/i18n/DataNodePipeMessages.java |  43 ++++++-
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 129 +++++++++++++++++--
 ...istoricalDataRegionTsFileAndDeletionSource.java | 136 ++++++++++++++-------
 .../PipeTsFileEpochProgressIndexKeeper.java        |   2 +-
 .../iotdb/db/service/DataNodeShutdownHook.java     |  10 +-
 ...ricalDataRegionTsFileAndDeletionSourceTest.java | 100 +++++++++++++++
 .../PipeTsFileEpochProgressIndexKeeperTest.java    |  46 ++++++-
 .../commons/consensus/index/ProgressIndex.java     |  11 ++
 9 files changed, 455 insertions(+), 61 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index fe924fbbaf9..0d1059d90b6 100644
--- 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -140,6 +140,45 @@ public final class DataNodePipeMessages {
           + "node may not be ready yet, and meta will be pushed by config node 
later.";
   public static final String FAILED_TO_PERSIST_PROGRESS_INDEX_TO_CONFIGNODE =
       "Failed to persist progress index to configNode, status: {}";
+  public static final String SHUTDOWN_PROGRESS_NOT_CONFIRMED =
+      "This shutdown progress was not confirmed to be persisted on 
ConfigNode.";
+  public static final String 
START_TO_PERSIST_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+      "Start to persist all pipe progress indexes during shutdown, pipe count 
{}, deadline {} ms";
+  public static final String
+      INTERRUPTED_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+          "Interrupted while persisting all pipe progress indexes during 
shutdown. "
+              + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+  public static final String
+      TIMED_OUT_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+          "Timed out after {} ms while persisting all pipe progress indexes 
during shutdown. "
+              + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+  public static final String 
FAILED_TO_PERSIST_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+      "Failed to persist all pipe progress indexes during shutdown within {} 
ms. "
+          + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+  public static final String 
COLLECTED_PIPE_METAS_FOR_SHUTDOWN_PROGRESS_PERSIST =
+      "Collected pipe metas for shutdown progress persist, pipe count {}, pipe 
meta count {}, "
+          + "pipe meta size {} bytes, took {} ms";
+  public static final String COLLECTED_EMPTY_PIPE_METAS_DURING_SHUTDOWN =
+      "Collected empty pipe metas for {} pipes during shutdown.";
+  public static final String 
START_TO_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE =
+      "Start to pushHeartbeat shutdown pipe meta to ConfigNode, dataNode id 
{}, pipe count {}, "
+          + "pipe meta count {}, pipe meta size {} bytes";
+  public static final String 
FAILED_TO_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE =
+      "Failed to pushHeartbeat shutdown pipe meta to ConfigNode, status {}, 
took {} ms. "
+          + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+  public static final String
+      SUCCESSFULLY_FINISHED_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE =
+          "Successfully finished pushHeartbeat shutdown pipe meta to 
ConfigNode, pipe count {}, "
+              + "pipe meta count {}, pipe meta size {} bytes, took {} ms";
+  public static final String
+      
EXCEPTION_OCCURRED_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+          "Exception occurred while persisting all pipe progress indexes 
during shutdown. "
+              + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+  public static final String PERSISTING_PIPE_PROGRESS_INDEXES_BEFORE_SHUTDOWN =
+      "Persisting pipe progress indexes before shutdown with timeout {} ms.";
+  public static final String 
PIPE_PROGRESS_INDEXES_WERE_NOT_CONFIRMED_DURING_SHUTDOWN =
+      "Pipe progress indexes were not confirmed by ConfigNode during shutdown. 
"
+          + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
   public static final String FAILURE_WHEN_REGISTER_PIPE_PLUGIN_SKIP_THIS =
       "Failure when register pipe plugin {}. Skip this plugin and continue 
startup.";
   public static final String
diff --git 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index b286d7607f8..13369260db0 100644
--- 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -132,7 +132,46 @@ public final class DataNodePipeMessages {
       "获取 pipe task meta from config node. Ignore the exception 失败,原因:config 
node may not be "
           + "ready yet, and meta will be pushed by config node later.";
   public static final String FAILED_TO_PERSIST_PROGRESS_INDEX_TO_CONFIGNODE =
-      "持久化 progress index 到 configNode 失败,状态:{}";
+      "持久化进度索引到 ConfigNode 失败,状态:{}";
+  public static final String SHUTDOWN_PROGRESS_NOT_CONFIRMED =
+      "本次关闭流程中的进度未确认已持久化到 ConfigNode。";
+  public static final String 
START_TO_PERSIST_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+      "开始在关闭期间持久化所有 Pipe 进度索引,Pipe 数量 {},超时时间 {} ms";
+  public static final String
+      INTERRUPTED_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+          "在关闭期间持久化所有 Pipe 进度索引时被中断。"
+              + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+  public static final String
+      TIMED_OUT_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+          "在关闭期间持久化所有 Pipe 进度索引超时,耗时 {} ms。"
+              + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+  public static final String 
FAILED_TO_PERSIST_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+      "在关闭期间持久化所有 Pipe 进度索引失败,耗时 {} ms。"
+          + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+  public static final String 
COLLECTED_PIPE_METAS_FOR_SHUTDOWN_PROGRESS_PERSIST =
+      "已收集关闭期间进度持久化所需的 Pipe 元数据,Pipe 数量 {},Pipe 元数据数量 {},"
+          + "Pipe 元数据大小 {} 字节,耗时 {} ms";
+  public static final String COLLECTED_EMPTY_PIPE_METAS_DURING_SHUTDOWN =
+      "关闭期间为 {} 个 Pipe 收集到空 Pipe 元数据。";
+  public static final String 
START_TO_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE =
+      "开始向 ConfigNode 推送关闭期间的 Pipe 元数据心跳,DataNode ID {},Pipe 数量 {},"
+          + "Pipe 元数据数量 {},Pipe 元数据大小 {} 字节";
+  public static final String 
FAILED_TO_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE =
+      "向 ConfigNode 推送关闭期间的 Pipe 元数据心跳失败,状态 {},耗时 {} ms。"
+          + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+  public static final String
+      SUCCESSFULLY_FINISHED_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE =
+          "成功向 ConfigNode 推送关闭期间的 Pipe 元数据心跳,Pipe 数量 {},Pipe 元数据数量 {},"
+              + "Pipe 元数据大小 {} 字节,耗时 {} ms";
+  public static final String
+      
EXCEPTION_OCCURRED_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+          "在关闭期间持久化所有 Pipe 进度索引时发生异常。"
+              + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+  public static final String PERSISTING_PIPE_PROGRESS_INDEXES_BEFORE_SHUTDOWN =
+      "关闭前正在持久化 Pipe 进度索引,超时时间 {} ms。";
+  public static final String 
PIPE_PROGRESS_INDEXES_WERE_NOT_CONFIRMED_DURING_SHUTDOWN =
+      "关闭期间 Pipe 进度索引未被 ConfigNode 确认。"
+          + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
   public static final String FAILURE_WHEN_REGISTER_PIPE_PLUGIN_SKIP_THIS =
       "注册 pipe plugin {} 失败。将跳过该插件并继续启动。";
   public static final String
@@ -209,7 +248,7 @@ public final class DataNodePipeMessages {
       "subtask {} 已关闭, ignore exception";
   public static final String SUBTASK_WORKER_IS_INTERRUPTED = "子任务工作线程被中断";
   public static final String SUCCESSFULLY_PERSISTED_ALL_PIPE_S_INFO_TO =
-      "成功 persisted all pipe's info to configNode。";
+      "成功将所有 Pipe 信息持久化到 ConfigNode。";
   public static final String THE_EXECUTOR_AND_HAS_BEEN_SUCCESSFULLY_SHUTDOWN =
       "执行器 {} 和 {} 已成功关闭。";
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index dd15422dba1..033264b831f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -99,6 +99,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -623,25 +624,129 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent 
{
 
   ///////////////////////// Shutdown Logic /////////////////////////
 
+  public long getShutdownProgressPersistTimeoutInMs() {
+    return Math.max(
+        1_000L,
+        (long) 
CommonDescriptor.getInstance().getConfig().getCnConnectionTimeoutInMS()
+            + 
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS());
+  }
+
+  public boolean persistAllProgressIndex(final long timeoutInMs) {
+    final long normalizedTimeoutInMs = Math.max(1L, timeoutInMs);
+    final long startTime = System.currentTimeMillis();
+    final AtomicBoolean isConfirmed = new AtomicBoolean(false);
+    final Thread persistThread =
+        new Thread(
+            () -> isConfirmed.set(persistAllProgressIndexInternal()),
+            ThreadName.PIPE_RUNTIME_META_SYNCER.getName() + 
"-Shutdown-Persist");
+    persistThread.setDaemon(true);
+
+    LOGGER.info(
+        
DataNodePipeMessages.START_TO_PERSIST_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN,
+        getPipeCount(),
+        normalizedTimeoutInMs);
+    persistThread.start();
+    try {
+      final long deadlineInMs = startTime + normalizedTimeoutInMs;
+      while (persistThread.isAlive()) {
+        final long remainingTimeInMs = deadlineInMs - 
System.currentTimeMillis();
+        if (remainingTimeInMs <= 0) {
+          break;
+        }
+        persistThread.join(remainingTimeInMs);
+      }
+    } catch (final InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.info(
+          DataNodePipeMessages
+              
.INTERRUPTED_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN);
+      return false;
+    }
+
+    if (persistThread.isAlive()) {
+      LOGGER.warn(
+          
DataNodePipeMessages.TIMED_OUT_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN,
+          System.currentTimeMillis() - startTime);
+      return false;
+    }
+
+    if (!isConfirmed.get()) {
+      LOGGER.warn(
+          
DataNodePipeMessages.FAILED_TO_PERSIST_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN,
+          System.currentTimeMillis() - startTime);
+    }
+    return isConfirmed.get();
+  }
+
   public void persistAllProgressIndex() {
-    try (final ConfigNodeClient configNodeClient =
-        
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
-      // Send request to some API server
+    persistAllProgressIndex(getShutdownProgressPersistTimeoutInMs());
+  }
+
+  private boolean persistAllProgressIndexInternal() {
+    final long collectStartTime = System.currentTimeMillis();
+    final int pipeCount = getPipeCount();
+    try {
       final TPipeHeartbeatResp resp = new TPipeHeartbeatResp(new 
ArrayList<>());
       collectPipeMetaList(new TPipeHeartbeatReq(Long.MIN_VALUE), resp);
+      final int pipeMetaCount = resp.getPipeMetaList().size();
+      final int pipeMetaSizeInBytes =
+          resp.getPipeMetaList().stream()
+              .filter(Objects::nonNull)
+              .mapToInt(ByteBuffer::remaining)
+              .sum();
+      LOGGER.info(
+          
DataNodePipeMessages.COLLECTED_PIPE_METAS_FOR_SHUTDOWN_PROGRESS_PERSIST,
+          pipeCount,
+          pipeMetaCount,
+          pipeMetaSizeInBytes,
+          System.currentTimeMillis() - collectStartTime);
+
       if (resp.getPipeMetaList().isEmpty()) {
-        return;
+        if (pipeCount != 0) {
+          
LOGGER.info(DataNodePipeMessages.COLLECTED_EMPTY_PIPE_METAS_DURING_SHUTDOWN, 
pipeCount);
+          return false;
+        }
+        return true;
       }
-      final TSStatus result =
-          configNodeClient.pushHeartbeat(
-              IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
-      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
-        
LOGGER.warn(DataNodePipeMessages.FAILED_TO_PERSIST_PROGRESS_INDEX_TO_CONFIGNODE,
 result);
-      } else {
-        
LOGGER.info(DataNodePipeMessages.SUCCESSFULLY_PERSISTED_ALL_PIPE_S_INFO_TO);
+
+      try (final ConfigNodeClient configNodeClient =
+          
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
+        LOGGER.info(
+            
DataNodePipeMessages.START_TO_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE,
+            IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
+            pipeCount,
+            pipeMetaCount,
+            pipeMetaSizeInBytes);
+        final long pushStartTime = System.currentTimeMillis();
+        final TSStatus result =
+            configNodeClient.pushHeartbeat(
+                IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), 
resp);
+        final long pushCostTime = System.currentTimeMillis() - pushStartTime;
+        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
+          
LOGGER.warn(DataNodePipeMessages.FAILED_TO_PERSIST_PROGRESS_INDEX_TO_CONFIGNODE,
 result);
+          LOGGER.warn(
+              
DataNodePipeMessages.FAILED_TO_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE,
+              result,
+              pushCostTime);
+          return false;
+        } else {
+          LOGGER.info(
+              DataNodePipeMessages
+                  
.SUCCESSFULLY_FINISHED_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE,
+              pipeCount,
+              pipeMetaCount,
+              pipeMetaSizeInBytes,
+              pushCostTime);
+          
LOGGER.info(DataNodePipeMessages.SUCCESSFULLY_PERSISTED_ALL_PIPE_S_INFO_TO);
+          return true;
+        }
       }
     } catch (final Exception e) {
-      LOGGER.warn(e.getMessage());
+      LOGGER.warn(
+          DataNodePipeMessages
+              
.EXCEPTION_OCCURRED_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN,
+          e);
+      return false;
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index e71d80a61a6..cb02df50a39 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -575,28 +575,12 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
           originalUnSequenceTsFileCount,
           startIndex);
 
+      final HistoricalTsFileExtractionStatistics statistics =
+          new HistoricalTsFileExtractionStatistics();
       final Map<TsFileResource, Set<String>> 
sequenceTsFileResources2TableNames =
           tsFileManager.getTsFileList(true).stream()
               .peek(originalResourceList::add)
-              .filter(
-                  resource ->
-                      isHistoricalSourceEnabled
-                          &&
-                          // Some resource is marked as deleted but not 
removed from the list.
-                          !resource.isDeleted()
-                          // Some resource is generated by pipe. We ignore 
them if the pipe should
-                          // not transfer pipe requests.
-                          && (!resource.isGeneratedByPipe() || 
isForwardingPipeRequests)
-                          && (
-                          // Some resource may not be closed due to the 
control of
-                          // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore 
them.
-                          !resource.isClosed()
-                                  && 
Optional.ofNullable(resource.getProcessor())
-                                      
.map(TsFileProcessor::alreadyMarkedClosing)
-                                      .orElse(true)
-                              || mayTsFileContainUnprocessedData(resource)
-                                  && 
isTsFileResourceOverlappedWithTimeRange(resource)
-                                  && 
mayTsFileResourceOverlappedWithPattern(resource)))
+              .filter(resource -> shouldExtractTsFileResource(resource, 
statistics))
               .collect(
                   Collectors.toMap(
                       Function.identity(),
@@ -611,25 +595,7 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
       final Map<TsFileResource, Set<String>> 
unSequenceTsFileResources2TableNames =
           tsFileManager.getTsFileList(false).stream()
               .peek(originalResourceList::add)
-              .filter(
-                  resource ->
-                      isHistoricalSourceEnabled
-                          &&
-                          // Some resource is marked as deleted but not 
removed from the list.
-                          !resource.isDeleted()
-                          // Some resource is generated by pipe. We ignore 
them if the pipe should
-                          // not transfer pipe requests.
-                          && (!resource.isGeneratedByPipe() || 
isForwardingPipeRequests)
-                          && (
-                          // Some resource may not be closed due to the 
control of
-                          // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore 
them.
-                          !resource.isClosed()
-                                  && 
Optional.ofNullable(resource.getProcessor())
-                                      
.map(TsFileProcessor::alreadyMarkedClosing)
-                                      .orElse(true)
-                              || mayTsFileContainUnprocessedData(resource)
-                                  && 
isTsFileResourceOverlappedWithTimeRange(resource)
-                                  && 
mayTsFileResourceOverlappedWithPattern(resource)))
+              .filter(resource -> shouldExtractTsFileResource(resource, 
statistics))
               .collect(
                   Collectors.toMap(
                       Function.identity(),
@@ -652,6 +618,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
                       .pinTsFileResource(resource, shouldTransferModFile, 
pipeName);
                   return false;
                 } catch (final IOException e) {
+                  ++statistics.pinFailedCount;
                   LOGGER.warn(
                       DataNodePipeMessages.PIPE_FAILED_TO_PIN_TSFILERESOURCE,
                       resource.getTsFilePath(),
@@ -673,11 +640,77 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
           filteredTsFileResources2TableNames.size(),
           originalSequenceTsFileCount + originalUnSequenceTsFileCount,
           System.currentTimeMillis() - startHistoricalExtractionTime);
+      LOGGER.info(
+          "Pipe {}@{}: historical TsFile selection summary, selected by 
progress uncovered {}, "
+              + "selected by unclosed/closing {}, filtered by time/path {} 
(time {}, path {}), "
+              + "skipped covered {}, skipped deleted {}, skipped generated by 
pipe {}, "
+              + "pin failed {}",
+          pipeName,
+          dataRegionId,
+          statistics.selectedByProgressUncoveredCount,
+          statistics.selectedByUnclosedOrClosingCount,
+          statistics.filteredByTimeOrPathCount,
+          statistics.filteredByTimeCount,
+          statistics.filteredByPathCount,
+          statistics.skippedCoveredCount,
+          statistics.skippedDeletedCount,
+          statistics.skippedGeneratedByPipeCount,
+          statistics.pinFailedCount);
     } finally {
       tsFileManager.readUnlock();
     }
   }
 
+  private boolean shouldExtractTsFileResource(
+      final TsFileResource resource, final 
HistoricalTsFileExtractionStatistics statistics) {
+    if (!isHistoricalSourceEnabled) {
+      return false;
+    }
+
+    // Some resource is marked as deleted but not removed from the list.
+    if (resource.isDeleted()) {
+      ++statistics.skippedDeletedCount;
+      return false;
+    }
+
+    // Some resource is generated by pipe. We ignore them if the pipe should 
not transfer pipe
+    // requests.
+    if (resource.isGeneratedByPipe() && !isForwardingPipeRequests) {
+      ++statistics.skippedGeneratedByPipeCount;
+      return false;
+    }
+
+    // Some resource may not be closed due to the control of 
PIPE_MIN_FLUSH_INTERVAL_IN_MS. We
+    // simply ignore them.
+    if (!resource.isClosed()
+        && Optional.ofNullable(resource.getProcessor())
+            .map(TsFileProcessor::alreadyMarkedClosing)
+            .orElse(true)) {
+      ++statistics.selectedByUnclosedOrClosingCount;
+      return true;
+    }
+
+    if (!mayTsFileContainUnprocessedData(resource)) {
+      ++statistics.skippedCoveredCount;
+      return false;
+    }
+
+    if (!isTsFileResourceOverlappedWithTimeRange(resource)) {
+      ++statistics.filteredByTimeOrPathCount;
+      ++statistics.filteredByTimeCount;
+      return false;
+    }
+
+    if (!mayTsFileResourceOverlappedWithPattern(resource)) {
+      ++statistics.filteredByTimeOrPathCount;
+      ++statistics.filteredByPathCount;
+      return false;
+    }
+
+    ++statistics.selectedByProgressUncoveredCount;
+    return true;
+  }
+
   private boolean mayTsFileContainUnprocessedData(final TsFileResource 
resource) {
     if (startIndex instanceof TimeWindowStateProgressIndex) {
       // The resource is closed thus the TsFileResource#getFileEndTime() is 
safe to use
@@ -693,13 +726,15 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
       // instead of replication or something else.
       ProgressIndex dedicatedProgressIndex =
           
tryToExtractLocalProgressIndexForIoTV2(resource.getMaxProgressIndexAfterClose());
-      return greaterThanStartIndex(resource, dedicatedProgressIndex);
+      return isProgressIndexNotCoveredByStartIndex(resource, 
dedicatedProgressIndex);
     }
-    return greaterThanStartIndex(resource, 
resource.getMaxProgressIndexAfterClose());
+    return isProgressIndexNotCoveredByStartIndex(
+        resource, resource.getMaxProgressIndexAfterClose());
   }
 
-  private boolean greaterThanStartIndex(PersistentResource resource, 
ProgressIndex progressIndex) {
-    if (!startIndex.isAfter(progressIndex) && 
!startIndex.equals(progressIndex)) {
+  private boolean isProgressIndexNotCoveredByStartIndex(
+      PersistentResource resource, ProgressIndex progressIndex) {
+    if (!startIndex.isEqualOrAfter(progressIndex)) {
       LOGGER.info(
           DataNodePipeMessages
               
.PIPE_RESOURCE_MEETS_MAYTSFILECONTAINUNPROCESSEDDATA_CONDITION_EXTRACT,
@@ -713,6 +748,19 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
     return false;
   }
 
+  private static class HistoricalTsFileExtractionStatistics {
+
+    private int selectedByProgressUncoveredCount;
+    private int selectedByUnclosedOrClosingCount;
+    private int filteredByTimeOrPathCount;
+    private int filteredByTimeCount;
+    private int filteredByPathCount;
+    private int skippedCoveredCount;
+    private int skippedDeletedCount;
+    private int skippedGeneratedByPipeCount;
+    private int pinFailedCount;
+  }
+
   private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource 
resource) {
     // Trimming to avoid unnecessary file device getter
     if (isDbNameCoveredByPattern) {
@@ -793,7 +841,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
               if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
                 toBeCompared = 
tryToExtractLocalProgressIndexForIoTV2(toBeCompared);
               }
-              return !greaterThanStartIndex(resource, toBeCompared);
+              return !isProgressIndexNotCoveredByStartIndex(resource, 
toBeCompared);
             })
         .forEach(DeletionResource::decreaseReference);
     // Get deletions that should be sent.
@@ -805,7 +853,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
                   if 
(pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
                     toBeCompared = 
tryToExtractLocalProgressIndexForIoTV2(toBeCompared);
                   }
-                  return greaterThanStartIndex(resource, toBeCompared);
+                  return isProgressIndexNotCoveredByStartIndex(resource, 
toBeCompared);
                 })
             .collect(Collectors.toList());
     resourceList.addAll(allDeletionResources);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
index aaf03f570e2..8fe23303363 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
@@ -114,7 +114,7 @@ public class PipeTsFileEpochProgressIndexKeeper {
         .filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
         .map(Entry::getValue)
         .filter(Objects::nonNull)
-        .anyMatch(resource -> 
!resource.getMaxProgressIndex().isAfter(progressIndex));
+        .anyMatch(resource -> 
progressIndex.isEqualOrAfter(resource.getMaxProgressIndex()));
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index c2806e32528..681500a25aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.audit.DNAuditLogger;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.i18n.DataNodeMiscMessages;
+import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
 import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
@@ -173,7 +174,14 @@ public class DataNodeShutdownHook extends Thread {
       }
     }
     // Persist progress index before shutdown to accurate recovery after 
restart
-    PipeDataNodeAgent.task().persistAllProgressIndex();
+    final long shutdownProgressPersistTimeoutInMs =
+        PipeDataNodeAgent.task().getShutdownProgressPersistTimeoutInMs();
+    logger.info(
+        DataNodePipeMessages.PERSISTING_PIPE_PROGRESS_INDEXES_BEFORE_SHUTDOWN,
+        shutdownProgressPersistTimeoutInMs);
+    if 
(!PipeDataNodeAgent.task().persistAllProgressIndex(shutdownProgressPersistTimeoutInMs))
 {
+      
logger.warn(DataNodePipeMessages.PIPE_PROGRESS_INDEXES_WERE_NOT_CONFIRMED_DURING_SHUTDOWN);
+    }
     // Shutdown all consensus pipe's receiver
     PipeDataNodeAgent.receiver().iotConsensusV2().closeReceiverExecutor();
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
index b54efc07f79..edffbb5b31c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
@@ -19,7 +19,12 @@
 
 package org.apache.iotdb.db.pipe.source.dataregion.historical;
 
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
@@ -29,6 +34,7 @@ import 
org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -39,6 +45,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.nio.file.Files;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -46,6 +53,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
@@ -195,6 +203,55 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
     }
   }
 
+  @Test
+  public void testMayTsFileContainUnprocessedDataUsesEqualOrAfterCoverage() 
throws Exception {
+    final File tempDir = 
Files.createTempDirectory("pipeHistoricalProgressCoverage").toFile();
+
+    try {
+      assertMayTsFileContainUnprocessedData(
+          tempDir,
+          "superset.tsfile",
+          hybridProgressIndex(
+              new IoTProgressIndex(Map.of(1, 100L, 2, 200L)),
+              new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10))),
+          hybridProgressIndex(
+              new IoTProgressIndex(Map.of(1, 100L)),
+              new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 9))),
+          false);
+
+      assertMayTsFileContainUnprocessedData(
+          tempDir,
+          "missing-dimension.tsfile",
+          hybridProgressIndex(new IoTProgressIndex(Map.of(1, 100L))),
+          hybridProgressIndex(
+              new IoTProgressIndex(Map.of(1, 90L)),
+              new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10))),
+          true);
+
+      assertMayTsFileContainUnprocessedData(
+          tempDir,
+          "larger-iot.tsfile",
+          hybridProgressIndex(
+              new IoTProgressIndex(Map.of(1, 100L, 2, 200L)),
+              new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10))),
+          hybridProgressIndex(
+              new IoTProgressIndex(Map.of(1, 101L)),
+              new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10))),
+          true);
+
+      final ProgressIndex recoverProgressIndex =
+          new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10));
+      assertMayTsFileContainUnprocessedData(
+          tempDir,
+          "old-sequence-recover.tsfile",
+          hybridProgressIndex(recoverProgressIndex, new 
IoTProgressIndex(Map.of(1, 100L))),
+          recoverProgressIndex,
+          false);
+    } finally {
+      FileUtils.deleteFileOrDirectory(tempDir);
+    }
+  }
+
   private static TsFileResource createTsFileResource(final File tempDir, final 
String fileName)
       throws IOException {
     final File file = new File(tempDir, fileName);
@@ -202,6 +259,49 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
     return new TsFileResource(file);
   }
 
+  private static TsFileResource createClosedTsFileResource(
+      final File tempDir, final String fileName, final ProgressIndex 
progressIndex)
+      throws IOException {
+    final TsFileResource resource = createTsFileResource(tempDir, fileName);
+    resource.setStatusForTest(TsFileResourceStatus.NORMAL);
+    resource.updateProgressIndex(progressIndex);
+    return resource;
+  }
+
+  private static void assertMayTsFileContainUnprocessedData(
+      final File tempDir,
+      final String fileName,
+      final ProgressIndex startIndex,
+      final ProgressIndex resourceProgressIndex,
+      final boolean expected)
+      throws Exception {
+    Assert.assertEquals(!expected, 
startIndex.isEqualOrAfter(resourceProgressIndex));
+
+    final PipeHistoricalDataRegionTsFileAndDeletionSource source =
+        new PipeHistoricalDataRegionTsFileAndDeletionSource();
+    setPrivateField(source, "pipeName", "pipe");
+    setPrivateField(source, "dataRegionId", 1);
+    setPrivateField(source, "startIndex", startIndex);
+
+    final Method method =
+        
PipeHistoricalDataRegionTsFileAndDeletionSource.class.getDeclaredMethod(
+            "mayTsFileContainUnprocessedData", TsFileResource.class);
+    method.setAccessible(true);
+    Assert.assertEquals(
+        expected,
+        method.invoke(
+            source, createClosedTsFileResource(tempDir, fileName, 
resourceProgressIndex)));
+  }
+
+  private static ProgressIndex hybridProgressIndex(
+      final ProgressIndex firstProgressIndex, final ProgressIndex... 
progressIndexes) {
+    ProgressIndex result = new HybridProgressIndex(firstProgressIndex);
+    for (final ProgressIndex progressIndex : progressIndexes) {
+      result = 
result.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex);
+    }
+    return result;
+  }
+
   private static void setPrivateField(
       final PipeHistoricalDataRegionTsFileAndDeletionSource source,
       final String fieldName,
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
index 4d27fff5d57..d4cc97d9213 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
@@ -19,6 +19,10 @@
 
 package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
 
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -31,6 +35,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.Map;
 
 public class PipeTsFileEpochProgressIndexKeeperTest {
 
@@ -89,6 +94,31 @@ public class PipeTsFileEpochProgressIndexKeeperTest {
             new SimpleProgressIndex(1, 2L)));
   }
 
+  @Test
+  public void testProgressIndexCheckUsesEqualOrAfterCoverage() throws 
IOException {
+    final ProgressIndex registeredProgressIndex =
+        hybridProgressIndex(
+            new IoTProgressIndex(Map.of(1, 90L)),
+            new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10)));
+    keeper.registerProgressIndex(
+        DATA_REGION_ID,
+        TASK_SCOPE_A,
+        createTsFileResource("registered-hybrid.tsfile", 
registeredProgressIndex));
+
+    Assert.assertFalse(
+        keeper.isProgressIndexAfterOrEquals(
+            DATA_REGION_ID, TASK_SCOPE_A, "current.tsfile", new 
IoTProgressIndex(Map.of(1, 100L))));
+
+    Assert.assertTrue(
+        keeper.isProgressIndexAfterOrEquals(
+            DATA_REGION_ID,
+            TASK_SCOPE_A,
+            "current.tsfile",
+            hybridProgressIndex(
+                new IoTProgressIndex(Map.of(1, 100L)),
+                new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 
10)))));
+  }
+
   @Test
   public void testClearProgressIndexOnlyRemovesTargetTaskScope() throws 
IOException {
     final TsFileResource scopeAResource = 
createTsFileResource("scope-a.tsfile", 1L);
@@ -107,11 +137,25 @@ public class PipeTsFileEpochProgressIndexKeeperTest {
 
   private TsFileResource createTsFileResource(final String fileName, final 
long flushOrderId)
       throws IOException {
+    return createTsFileResource(fileName, new SimpleProgressIndex(1, 
flushOrderId));
+  }
+
+  private TsFileResource createTsFileResource(
+      final String fileName, final ProgressIndex progressIndex) throws 
IOException {
     final File file = new File(tempDir, fileName);
     Assert.assertTrue(file.createNewFile());
 
     final TsFileResource resource = new TsFileResource(file);
-    resource.updateProgressIndex(new SimpleProgressIndex(1, flushOrderId));
+    resource.updateProgressIndex(progressIndex);
     return resource;
   }
+
+  private ProgressIndex hybridProgressIndex(
+      final ProgressIndex firstProgressIndex, final ProgressIndex... 
progressIndexes) {
+    ProgressIndex result = new HybridProgressIndex(firstProgressIndex);
+    for (final ProgressIndex progressIndex : progressIndexes) {
+      result = 
result.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex);
+    }
+    return result;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
index 3c8d13bab54..979eee0c8db 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -116,6 +116,17 @@ public abstract class ProgressIndex implements Accountable 
{
     return super.hashCode();
   }
 
+  /**
+   * A.isEqualOrAfter(B) is true if and only if A already covers B in every 
tuple member. In other
+   * words, blending B into A does not advance A.
+   *
+   * @param progressIndex the progress index to be compared
+   * @return true if and only if this progress index is equal to or after the 
given progress index
+   */
+  public final boolean isEqualOrAfter(@Nonnull final ProgressIndex 
progressIndex) {
+    return 
updateToMinimumEqualOrIsAfterProgressIndex(progressIndex).equals(this);
+  }
+
   /**
    * Define the isEqualOrAfter relation, A.isEqualOrAfter(B) if and only if 
each tuple member in A
    * is greater than or equal to B in the corresponding total order relation.


Reply via email to