This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b33278688c2 Pipe: improve progress coverage checks (#17940)
b33278688c2 is described below
commit b33278688c2fc5f8c6f860253204b4e835f16487
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 17:11:26 2026 +0800
Pipe: improve progress coverage checks (#17940)
* Pipe: improve progress coverage checks
* Pipe: address shutdown progress review comments
* Pipe: refine Chinese shutdown progress messages
---
.../apache/iotdb/db/i18n/DataNodePipeMessages.java | 39 ++++++
.../apache/iotdb/db/i18n/DataNodePipeMessages.java | 43 ++++++-
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 129 +++++++++++++++++--
...istoricalDataRegionTsFileAndDeletionSource.java | 136 ++++++++++++++-------
.../PipeTsFileEpochProgressIndexKeeper.java | 2 +-
.../iotdb/db/service/DataNodeShutdownHook.java | 10 +-
...ricalDataRegionTsFileAndDeletionSourceTest.java | 100 +++++++++++++++
.../PipeTsFileEpochProgressIndexKeeperTest.java | 46 ++++++-
.../commons/consensus/index/ProgressIndex.java | 11 ++
9 files changed, 455 insertions(+), 61 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index fe924fbbaf9..0d1059d90b6 100644
---
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -140,6 +140,45 @@ public final class DataNodePipeMessages {
+ "node may not be ready yet, and meta will be pushed by config node
later.";
public static final String FAILED_TO_PERSIST_PROGRESS_INDEX_TO_CONFIGNODE =
"Failed to persist progress index to configNode, status: {}";
+ public static final String SHUTDOWN_PROGRESS_NOT_CONFIRMED =
+ "This shutdown progress was not confirmed to be persisted on
ConfigNode.";
+ public static final String
START_TO_PERSIST_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+ "Start to persist all pipe progress indexes during shutdown, pipe count
{}, deadline {} ms";
+ public static final String
+ INTERRUPTED_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+ "Interrupted while persisting all pipe progress indexes during
shutdown. "
+ + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+ public static final String
+ TIMED_OUT_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+ "Timed out after {} ms while persisting all pipe progress indexes
during shutdown. "
+ + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+ public static final String
FAILED_TO_PERSIST_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+ "Failed to persist all pipe progress indexes during shutdown within {}
ms. "
+ + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+ public static final String
COLLECTED_PIPE_METAS_FOR_SHUTDOWN_PROGRESS_PERSIST =
+ "Collected pipe metas for shutdown progress persist, pipe count {}, pipe
meta count {}, "
+ + "pipe meta size {} bytes, took {} ms";
+ public static final String COLLECTED_EMPTY_PIPE_METAS_DURING_SHUTDOWN =
+ "Collected empty pipe metas for {} pipes during shutdown.";
+ public static final String
START_TO_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE =
+ "Start to pushHeartbeat shutdown pipe meta to ConfigNode, dataNode id
{}, pipe count {}, "
+ + "pipe meta count {}, pipe meta size {} bytes";
+ public static final String
FAILED_TO_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE =
+ "Failed to pushHeartbeat shutdown pipe meta to ConfigNode, status {},
took {} ms. "
+ + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+ public static final String
+ SUCCESSFULLY_FINISHED_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE =
+ "Successfully finished pushHeartbeat shutdown pipe meta to
ConfigNode, pipe count {}, "
+ + "pipe meta count {}, pipe meta size {} bytes, took {} ms";
+ public static final String
+
EXCEPTION_OCCURRED_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+ "Exception occurred while persisting all pipe progress indexes
during shutdown. "
+ + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+ public static final String PERSISTING_PIPE_PROGRESS_INDEXES_BEFORE_SHUTDOWN =
+ "Persisting pipe progress indexes before shutdown with timeout {} ms.";
+ public static final String
PIPE_PROGRESS_INDEXES_WERE_NOT_CONFIRMED_DURING_SHUTDOWN =
+ "Pipe progress indexes were not confirmed by ConfigNode during shutdown.
"
+ + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
public static final String FAILURE_WHEN_REGISTER_PIPE_PLUGIN_SKIP_THIS =
"Failure when register pipe plugin {}. Skip this plugin and continue
startup.";
public static final String
diff --git
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index b286d7607f8..13369260db0 100644
---
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -132,7 +132,46 @@ public final class DataNodePipeMessages {
"获取 pipe task meta from config node. Ignore the exception 失败,原因:config
node may not be "
+ "ready yet, and meta will be pushed by config node later.";
public static final String FAILED_TO_PERSIST_PROGRESS_INDEX_TO_CONFIGNODE =
- "持久化 progress index 到 configNode 失败,状态:{}";
+ "持久化进度索引到 ConfigNode 失败,状态:{}";
+ public static final String SHUTDOWN_PROGRESS_NOT_CONFIRMED =
+ "本次关闭流程中的进度未确认已持久化到 ConfigNode。";
+ public static final String
START_TO_PERSIST_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+ "开始在关闭期间持久化所有 Pipe 进度索引,Pipe 数量 {},超时时间 {} ms";
+ public static final String
+ INTERRUPTED_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+ "在关闭期间持久化所有 Pipe 进度索引时被中断。"
+ + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+ public static final String
+ TIMED_OUT_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+ "在关闭期间持久化所有 Pipe 进度索引超时,耗时 {} ms。"
+ + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+ public static final String
FAILED_TO_PERSIST_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+ "在关闭期间持久化所有 Pipe 进度索引失败,耗时 {} ms。"
+ + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+ public static final String
COLLECTED_PIPE_METAS_FOR_SHUTDOWN_PROGRESS_PERSIST =
+ "已收集关闭期间进度持久化所需的 Pipe 元数据,Pipe 数量 {},Pipe 元数据数量 {},"
+ + "Pipe 元数据大小 {} 字节,耗时 {} ms";
+ public static final String COLLECTED_EMPTY_PIPE_METAS_DURING_SHUTDOWN =
+ "关闭期间为 {} 个 Pipe 收集到空 Pipe 元数据。";
+ public static final String
START_TO_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE =
+ "开始向 ConfigNode 推送关闭期间的 Pipe 元数据心跳,DataNode ID {},Pipe 数量 {},"
+ + "Pipe 元数据数量 {},Pipe 元数据大小 {} 字节";
+ public static final String
FAILED_TO_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE =
+ "向 ConfigNode 推送关闭期间的 Pipe 元数据心跳失败,状态 {},耗时 {} ms。"
+ + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+ public static final String
+ SUCCESSFULLY_FINISHED_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE =
+ "成功向 ConfigNode 推送关闭期间的 Pipe 元数据心跳,Pipe 数量 {},Pipe 元数据数量 {},"
+ + "Pipe 元数据大小 {} 字节,耗时 {} ms";
+ public static final String
+
EXCEPTION_OCCURRED_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN =
+ "在关闭期间持久化所有 Pipe 进度索引时发生异常。"
+ + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
+ public static final String PERSISTING_PIPE_PROGRESS_INDEXES_BEFORE_SHUTDOWN =
+ "关闭前正在持久化 Pipe 进度索引,超时时间 {} ms。";
+ public static final String
PIPE_PROGRESS_INDEXES_WERE_NOT_CONFIRMED_DURING_SHUTDOWN =
+ "关闭期间 Pipe 进度索引未被 ConfigNode 确认。"
+ + SHUTDOWN_PROGRESS_NOT_CONFIRMED;
public static final String FAILURE_WHEN_REGISTER_PIPE_PLUGIN_SKIP_THIS =
"注册 pipe plugin {} 失败。将跳过该插件并继续启动。";
public static final String
@@ -209,7 +248,7 @@ public final class DataNodePipeMessages {
"subtask {} 已关闭, ignore exception";
public static final String SUBTASK_WORKER_IS_INTERRUPTED = "子任务工作线程被中断";
public static final String SUCCESSFULLY_PERSISTED_ALL_PIPE_S_INFO_TO =
- "成功 persisted all pipe's info to configNode。";
+ "成功将所有 Pipe 信息持久化到 ConfigNode。";
public static final String THE_EXECUTOR_AND_HAS_BEEN_SUCCESSFULLY_SHUTDOWN =
"执行器 {} 和 {} 已成功关闭。";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index dd15422dba1..033264b831f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -99,6 +99,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -623,25 +624,129 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent
{
///////////////////////// Shutdown Logic /////////////////////////
+ public long getShutdownProgressPersistTimeoutInMs() {
+ return Math.max(
+ 1_000L,
+ (long)
CommonDescriptor.getInstance().getConfig().getCnConnectionTimeoutInMS()
+ +
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS());
+ }
+
+ public boolean persistAllProgressIndex(final long timeoutInMs) {
+ final long normalizedTimeoutInMs = Math.max(1L, timeoutInMs);
+ final long startTime = System.currentTimeMillis();
+ final AtomicBoolean isConfirmed = new AtomicBoolean(false);
+ final Thread persistThread =
+ new Thread(
+ () -> isConfirmed.set(persistAllProgressIndexInternal()),
+ ThreadName.PIPE_RUNTIME_META_SYNCER.getName() +
"-Shutdown-Persist");
+ persistThread.setDaemon(true);
+
+ LOGGER.info(
+
DataNodePipeMessages.START_TO_PERSIST_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN,
+ getPipeCount(),
+ normalizedTimeoutInMs);
+ persistThread.start();
+ try {
+ final long deadlineInMs = startTime + normalizedTimeoutInMs;
+ while (persistThread.isAlive()) {
+ final long remainingTimeInMs = deadlineInMs -
System.currentTimeMillis();
+ if (remainingTimeInMs <= 0) {
+ break;
+ }
+ persistThread.join(remainingTimeInMs);
+ }
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.info(
+ DataNodePipeMessages
+
.INTERRUPTED_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN);
+ return false;
+ }
+
+ if (persistThread.isAlive()) {
+ LOGGER.warn(
+
DataNodePipeMessages.TIMED_OUT_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN,
+ System.currentTimeMillis() - startTime);
+ return false;
+ }
+
+ if (!isConfirmed.get()) {
+ LOGGER.warn(
+
DataNodePipeMessages.FAILED_TO_PERSIST_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN,
+ System.currentTimeMillis() - startTime);
+ }
+ return isConfirmed.get();
+ }
+
public void persistAllProgressIndex() {
- try (final ConfigNodeClient configNodeClient =
-
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
- // Send request to some API server
+ persistAllProgressIndex(getShutdownProgressPersistTimeoutInMs());
+ }
+
+ private boolean persistAllProgressIndexInternal() {
+ final long collectStartTime = System.currentTimeMillis();
+ final int pipeCount = getPipeCount();
+ try {
final TPipeHeartbeatResp resp = new TPipeHeartbeatResp(new
ArrayList<>());
collectPipeMetaList(new TPipeHeartbeatReq(Long.MIN_VALUE), resp);
+ final int pipeMetaCount = resp.getPipeMetaList().size();
+ final int pipeMetaSizeInBytes =
+ resp.getPipeMetaList().stream()
+ .filter(Objects::nonNull)
+ .mapToInt(ByteBuffer::remaining)
+ .sum();
+ LOGGER.info(
+
DataNodePipeMessages.COLLECTED_PIPE_METAS_FOR_SHUTDOWN_PROGRESS_PERSIST,
+ pipeCount,
+ pipeMetaCount,
+ pipeMetaSizeInBytes,
+ System.currentTimeMillis() - collectStartTime);
+
if (resp.getPipeMetaList().isEmpty()) {
- return;
+ if (pipeCount != 0) {
+
LOGGER.info(DataNodePipeMessages.COLLECTED_EMPTY_PIPE_METAS_DURING_SHUTDOWN,
pipeCount);
+ return false;
+ }
+ return true;
}
- final TSStatus result =
- configNodeClient.pushHeartbeat(
- IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
-
LOGGER.warn(DataNodePipeMessages.FAILED_TO_PERSIST_PROGRESS_INDEX_TO_CONFIGNODE,
result);
- } else {
-
LOGGER.info(DataNodePipeMessages.SUCCESSFULLY_PERSISTED_ALL_PIPE_S_INFO_TO);
+
+ try (final ConfigNodeClient configNodeClient =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ LOGGER.info(
+
DataNodePipeMessages.START_TO_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE,
+ IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
+ pipeCount,
+ pipeMetaCount,
+ pipeMetaSizeInBytes);
+ final long pushStartTime = System.currentTimeMillis();
+ final TSStatus result =
+ configNodeClient.pushHeartbeat(
+ IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
resp);
+ final long pushCostTime = System.currentTimeMillis() - pushStartTime;
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
+
LOGGER.warn(DataNodePipeMessages.FAILED_TO_PERSIST_PROGRESS_INDEX_TO_CONFIGNODE,
result);
+ LOGGER.warn(
+
DataNodePipeMessages.FAILED_TO_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE,
+ result,
+ pushCostTime);
+ return false;
+ } else {
+ LOGGER.info(
+ DataNodePipeMessages
+
.SUCCESSFULLY_FINISHED_PUSH_HEARTBEAT_SHUTDOWN_PIPE_META_TO_CONFIGNODE,
+ pipeCount,
+ pipeMetaCount,
+ pipeMetaSizeInBytes,
+ pushCostTime);
+
LOGGER.info(DataNodePipeMessages.SUCCESSFULLY_PERSISTED_ALL_PIPE_S_INFO_TO);
+ return true;
+ }
}
} catch (final Exception e) {
- LOGGER.warn(e.getMessage());
+ LOGGER.warn(
+ DataNodePipeMessages
+
.EXCEPTION_OCCURRED_WHILE_PERSISTING_ALL_PIPE_PROGRESS_INDEXES_DURING_SHUTDOWN,
+ e);
+ return false;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index e71d80a61a6..cb02df50a39 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -575,28 +575,12 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
originalUnSequenceTsFileCount,
startIndex);
+ final HistoricalTsFileExtractionStatistics statistics =
+ new HistoricalTsFileExtractionStatistics();
final Map<TsFileResource, Set<String>>
sequenceTsFileResources2TableNames =
tsFileManager.getTsFileList(true).stream()
.peek(originalResourceList::add)
- .filter(
- resource ->
- isHistoricalSourceEnabled
- &&
- // Some resource is marked as deleted but not
removed from the list.
- !resource.isDeleted()
- // Some resource is generated by pipe. We ignore
them if the pipe should
- // not transfer pipe requests.
- && (!resource.isGeneratedByPipe() ||
isForwardingPipeRequests)
- && (
- // Some resource may not be closed due to the
control of
- // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore
them.
- !resource.isClosed()
- &&
Optional.ofNullable(resource.getProcessor())
-
.map(TsFileProcessor::alreadyMarkedClosing)
- .orElse(true)
- || mayTsFileContainUnprocessedData(resource)
- &&
isTsFileResourceOverlappedWithTimeRange(resource)
- &&
mayTsFileResourceOverlappedWithPattern(resource)))
+ .filter(resource -> shouldExtractTsFileResource(resource,
statistics))
.collect(
Collectors.toMap(
Function.identity(),
@@ -611,25 +595,7 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
final Map<TsFileResource, Set<String>>
unSequenceTsFileResources2TableNames =
tsFileManager.getTsFileList(false).stream()
.peek(originalResourceList::add)
- .filter(
- resource ->
- isHistoricalSourceEnabled
- &&
- // Some resource is marked as deleted but not
removed from the list.
- !resource.isDeleted()
- // Some resource is generated by pipe. We ignore
them if the pipe should
- // not transfer pipe requests.
- && (!resource.isGeneratedByPipe() ||
isForwardingPipeRequests)
- && (
- // Some resource may not be closed due to the
control of
- // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore
them.
- !resource.isClosed()
- &&
Optional.ofNullable(resource.getProcessor())
-
.map(TsFileProcessor::alreadyMarkedClosing)
- .orElse(true)
- || mayTsFileContainUnprocessedData(resource)
- &&
isTsFileResourceOverlappedWithTimeRange(resource)
- &&
mayTsFileResourceOverlappedWithPattern(resource)))
+ .filter(resource -> shouldExtractTsFileResource(resource,
statistics))
.collect(
Collectors.toMap(
Function.identity(),
@@ -652,6 +618,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
.pinTsFileResource(resource, shouldTransferModFile,
pipeName);
return false;
} catch (final IOException e) {
+ ++statistics.pinFailedCount;
LOGGER.warn(
DataNodePipeMessages.PIPE_FAILED_TO_PIN_TSFILERESOURCE,
resource.getTsFilePath(),
@@ -673,11 +640,77 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
filteredTsFileResources2TableNames.size(),
originalSequenceTsFileCount + originalUnSequenceTsFileCount,
System.currentTimeMillis() - startHistoricalExtractionTime);
+ LOGGER.info(
+ "Pipe {}@{}: historical TsFile selection summary, selected by
progress uncovered {}, "
+ + "selected by unclosed/closing {}, filtered by time/path {}
(time {}, path {}), "
+ + "skipped covered {}, skipped deleted {}, skipped generated by
pipe {}, "
+ + "pin failed {}",
+ pipeName,
+ dataRegionId,
+ statistics.selectedByProgressUncoveredCount,
+ statistics.selectedByUnclosedOrClosingCount,
+ statistics.filteredByTimeOrPathCount,
+ statistics.filteredByTimeCount,
+ statistics.filteredByPathCount,
+ statistics.skippedCoveredCount,
+ statistics.skippedDeletedCount,
+ statistics.skippedGeneratedByPipeCount,
+ statistics.pinFailedCount);
} finally {
tsFileManager.readUnlock();
}
}
+ private boolean shouldExtractTsFileResource(
+ final TsFileResource resource, final
HistoricalTsFileExtractionStatistics statistics) {
+ if (!isHistoricalSourceEnabled) {
+ return false;
+ }
+
+ // Some resource is marked as deleted but not removed from the list.
+ if (resource.isDeleted()) {
+ ++statistics.skippedDeletedCount;
+ return false;
+ }
+
+ // Some resource is generated by pipe. We ignore them if the pipe should
not transfer pipe
+ // requests.
+ if (resource.isGeneratedByPipe() && !isForwardingPipeRequests) {
+ ++statistics.skippedGeneratedByPipeCount;
+ return false;
+ }
+
+ // Some resource may not be closed due to the control of
PIPE_MIN_FLUSH_INTERVAL_IN_MS. We
+ // simply ignore them.
+ if (!resource.isClosed()
+ && Optional.ofNullable(resource.getProcessor())
+ .map(TsFileProcessor::alreadyMarkedClosing)
+ .orElse(true)) {
+ ++statistics.selectedByUnclosedOrClosingCount;
+ return true;
+ }
+
+ if (!mayTsFileContainUnprocessedData(resource)) {
+ ++statistics.skippedCoveredCount;
+ return false;
+ }
+
+ if (!isTsFileResourceOverlappedWithTimeRange(resource)) {
+ ++statistics.filteredByTimeOrPathCount;
+ ++statistics.filteredByTimeCount;
+ return false;
+ }
+
+ if (!mayTsFileResourceOverlappedWithPattern(resource)) {
+ ++statistics.filteredByTimeOrPathCount;
+ ++statistics.filteredByPathCount;
+ return false;
+ }
+
+ ++statistics.selectedByProgressUncoveredCount;
+ return true;
+ }
+
private boolean mayTsFileContainUnprocessedData(final TsFileResource
resource) {
if (startIndex instanceof TimeWindowStateProgressIndex) {
// The resource is closed thus the TsFileResource#getFileEndTime() is
safe to use
@@ -693,13 +726,15 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
// instead of replication or something else.
ProgressIndex dedicatedProgressIndex =
tryToExtractLocalProgressIndexForIoTV2(resource.getMaxProgressIndexAfterClose());
- return greaterThanStartIndex(resource, dedicatedProgressIndex);
+ return isProgressIndexNotCoveredByStartIndex(resource,
dedicatedProgressIndex);
}
- return greaterThanStartIndex(resource,
resource.getMaxProgressIndexAfterClose());
+ return isProgressIndexNotCoveredByStartIndex(
+ resource, resource.getMaxProgressIndexAfterClose());
}
- private boolean greaterThanStartIndex(PersistentResource resource,
ProgressIndex progressIndex) {
- if (!startIndex.isAfter(progressIndex) &&
!startIndex.equals(progressIndex)) {
+ private boolean isProgressIndexNotCoveredByStartIndex(
+ PersistentResource resource, ProgressIndex progressIndex) {
+ if (!startIndex.isEqualOrAfter(progressIndex)) {
LOGGER.info(
DataNodePipeMessages
.PIPE_RESOURCE_MEETS_MAYTSFILECONTAINUNPROCESSEDDATA_CONDITION_EXTRACT,
@@ -713,6 +748,19 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
return false;
}
+ private static class HistoricalTsFileExtractionStatistics {
+
+ private int selectedByProgressUncoveredCount;
+ private int selectedByUnclosedOrClosingCount;
+ private int filteredByTimeOrPathCount;
+ private int filteredByTimeCount;
+ private int filteredByPathCount;
+ private int skippedCoveredCount;
+ private int skippedDeletedCount;
+ private int skippedGeneratedByPipeCount;
+ private int pinFailedCount;
+ }
+
private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource
resource) {
// Trimming to avoid unnecessary file device getter
if (isDbNameCoveredByPattern) {
@@ -793,7 +841,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
toBeCompared =
tryToExtractLocalProgressIndexForIoTV2(toBeCompared);
}
- return !greaterThanStartIndex(resource, toBeCompared);
+ return !isProgressIndexNotCoveredByStartIndex(resource,
toBeCompared);
})
.forEach(DeletionResource::decreaseReference);
// Get deletions that should be sent.
@@ -805,7 +853,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
if
(pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
toBeCompared =
tryToExtractLocalProgressIndexForIoTV2(toBeCompared);
}
- return greaterThanStartIndex(resource, toBeCompared);
+ return isProgressIndexNotCoveredByStartIndex(resource,
toBeCompared);
})
.collect(Collectors.toList());
resourceList.addAll(allDeletionResources);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
index aaf03f570e2..8fe23303363 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
@@ -114,7 +114,7 @@ public class PipeTsFileEpochProgressIndexKeeper {
.filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
.map(Entry::getValue)
.filter(Objects::nonNull)
- .anyMatch(resource ->
!resource.getMaxProgressIndex().isAfter(progressIndex));
+ .anyMatch(resource ->
progressIndex.isEqualOrAfter(resource.getMaxProgressIndex()));
}
//////////////////////////// singleton ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index c2806e32528..681500a25aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.audit.DNAuditLogger;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.i18n.DataNodeMiscMessages;
+import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
@@ -173,7 +174,14 @@ public class DataNodeShutdownHook extends Thread {
}
}
// Persist progress index before shutdown to accurate recovery after
restart
- PipeDataNodeAgent.task().persistAllProgressIndex();
+ final long shutdownProgressPersistTimeoutInMs =
+ PipeDataNodeAgent.task().getShutdownProgressPersistTimeoutInMs();
+ logger.info(
+ DataNodePipeMessages.PERSISTING_PIPE_PROGRESS_INDEXES_BEFORE_SHUTDOWN,
+ shutdownProgressPersistTimeoutInMs);
+ if
(!PipeDataNodeAgent.task().persistAllProgressIndex(shutdownProgressPersistTimeoutInMs))
{
+
logger.warn(DataNodePipeMessages.PIPE_PROGRESS_INDEXES_WERE_NOT_CONFIRMED_DURING_SHUTDOWN);
+ }
// Shutdown all consensus pipe's receiver
PipeDataNodeAgent.receiver().iotConsensusV2().closeReceiverExecutor();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
index b54efc07f79..edffbb5b31c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
@@ -19,7 +19,12 @@
package org.apache.iotdb.db.pipe.source.dataregion.historical;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
@@ -29,6 +34,7 @@ import
org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
@@ -39,6 +45,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.nio.file.Files;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -46,6 +53,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
@@ -195,6 +203,55 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
}
}
+ @Test
+ public void testMayTsFileContainUnprocessedDataUsesEqualOrAfterCoverage()
throws Exception {
+ final File tempDir =
Files.createTempDirectory("pipeHistoricalProgressCoverage").toFile();
+
+ try {
+ assertMayTsFileContainUnprocessedData(
+ tempDir,
+ "superset.tsfile",
+ hybridProgressIndex(
+ new IoTProgressIndex(Map.of(1, 100L, 2, 200L)),
+ new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10))),
+ hybridProgressIndex(
+ new IoTProgressIndex(Map.of(1, 100L)),
+ new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 9))),
+ false);
+
+ assertMayTsFileContainUnprocessedData(
+ tempDir,
+ "missing-dimension.tsfile",
+ hybridProgressIndex(new IoTProgressIndex(Map.of(1, 100L))),
+ hybridProgressIndex(
+ new IoTProgressIndex(Map.of(1, 90L)),
+ new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10))),
+ true);
+
+ assertMayTsFileContainUnprocessedData(
+ tempDir,
+ "larger-iot.tsfile",
+ hybridProgressIndex(
+ new IoTProgressIndex(Map.of(1, 100L, 2, 200L)),
+ new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10))),
+ hybridProgressIndex(
+ new IoTProgressIndex(Map.of(1, 101L)),
+ new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10))),
+ true);
+
+ final ProgressIndex recoverProgressIndex =
+ new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10));
+ assertMayTsFileContainUnprocessedData(
+ tempDir,
+ "old-sequence-recover.tsfile",
+ hybridProgressIndex(recoverProgressIndex, new
IoTProgressIndex(Map.of(1, 100L))),
+ recoverProgressIndex,
+ false);
+ } finally {
+ FileUtils.deleteFileOrDirectory(tempDir);
+ }
+ }
+
private static TsFileResource createTsFileResource(final File tempDir, final
String fileName)
throws IOException {
final File file = new File(tempDir, fileName);
@@ -202,6 +259,49 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
return new TsFileResource(file);
}
+ private static TsFileResource createClosedTsFileResource(
+ final File tempDir, final String fileName, final ProgressIndex
progressIndex)
+ throws IOException {
+ final TsFileResource resource = createTsFileResource(tempDir, fileName);
+ resource.setStatusForTest(TsFileResourceStatus.NORMAL);
+ resource.updateProgressIndex(progressIndex);
+ return resource;
+ }
+
+ private static void assertMayTsFileContainUnprocessedData(
+ final File tempDir,
+ final String fileName,
+ final ProgressIndex startIndex,
+ final ProgressIndex resourceProgressIndex,
+ final boolean expected)
+ throws Exception {
+ Assert.assertEquals(!expected,
startIndex.isEqualOrAfter(resourceProgressIndex));
+
+ final PipeHistoricalDataRegionTsFileAndDeletionSource source =
+ new PipeHistoricalDataRegionTsFileAndDeletionSource();
+ setPrivateField(source, "pipeName", "pipe");
+ setPrivateField(source, "dataRegionId", 1);
+ setPrivateField(source, "startIndex", startIndex);
+
+ final Method method =
+
PipeHistoricalDataRegionTsFileAndDeletionSource.class.getDeclaredMethod(
+ "mayTsFileContainUnprocessedData", TsFileResource.class);
+ method.setAccessible(true);
+ Assert.assertEquals(
+ expected,
+ method.invoke(
+ source, createClosedTsFileResource(tempDir, fileName,
resourceProgressIndex)));
+ }
+
+ private static ProgressIndex hybridProgressIndex(
+ final ProgressIndex firstProgressIndex, final ProgressIndex...
progressIndexes) {
+ ProgressIndex result = new HybridProgressIndex(firstProgressIndex);
+ for (final ProgressIndex progressIndex : progressIndexes) {
+ result =
result.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex);
+ }
+ return result;
+ }
+
private static void setPrivateField(
final PipeHistoricalDataRegionTsFileAndDeletionSource source,
final String fieldName,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
index 4d27fff5d57..d4cc97d9213 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
@@ -19,6 +19,10 @@
package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -31,6 +35,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.Map;
public class PipeTsFileEpochProgressIndexKeeperTest {
@@ -89,6 +94,31 @@ public class PipeTsFileEpochProgressIndexKeeperTest {
new SimpleProgressIndex(1, 2L)));
}
+ @Test
+ public void testProgressIndexCheckUsesEqualOrAfterCoverage() throws
IOException {
+ final ProgressIndex registeredProgressIndex =
+ hybridProgressIndex(
+ new IoTProgressIndex(Map.of(1, 90L)),
+ new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10)));
+ keeper.registerProgressIndex(
+ DATA_REGION_ID,
+ TASK_SCOPE_A,
+ createTsFileResource("registered-hybrid.tsfile",
registeredProgressIndex));
+
+ Assert.assertFalse(
+ keeper.isProgressIndexAfterOrEquals(
+ DATA_REGION_ID, TASK_SCOPE_A, "current.tsfile", new
IoTProgressIndex(Map.of(1, 100L))));
+
+ Assert.assertTrue(
+ keeper.isProgressIndexAfterOrEquals(
+ DATA_REGION_ID,
+ TASK_SCOPE_A,
+ "current.tsfile",
+ hybridProgressIndex(
+ new IoTProgressIndex(Map.of(1, 100L)),
+ new RecoverProgressIndex(-1, new SimpleProgressIndex(0,
10)))));
+ }
+
@Test
public void testClearProgressIndexOnlyRemovesTargetTaskScope() throws
IOException {
final TsFileResource scopeAResource =
createTsFileResource("scope-a.tsfile", 1L);
@@ -107,11 +137,25 @@ public class PipeTsFileEpochProgressIndexKeeperTest {
private TsFileResource createTsFileResource(final String fileName, final
long flushOrderId)
throws IOException {
+ return createTsFileResource(fileName, new SimpleProgressIndex(1,
flushOrderId));
+ }
+
+ private TsFileResource createTsFileResource(
+ final String fileName, final ProgressIndex progressIndex) throws
IOException {
final File file = new File(tempDir, fileName);
Assert.assertTrue(file.createNewFile());
final TsFileResource resource = new TsFileResource(file);
- resource.updateProgressIndex(new SimpleProgressIndex(1, flushOrderId));
+ resource.updateProgressIndex(progressIndex);
return resource;
}
+
+ private ProgressIndex hybridProgressIndex(
+ final ProgressIndex firstProgressIndex, final ProgressIndex...
progressIndexes) {
+ ProgressIndex result = new HybridProgressIndex(firstProgressIndex);
+ for (final ProgressIndex progressIndex : progressIndexes) {
+ result =
result.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex);
+ }
+ return result;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
index 3c8d13bab54..979eee0c8db 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -116,6 +116,17 @@ public abstract class ProgressIndex implements Accountable
{
return super.hashCode();
}
+ /**
+ * A.isEqualOrAfter(B) is true if and only if A already covers B in every
tuple member. In other
+ * words, blending B into A does not advance A.
+ *
+ * @param progressIndex the progress index to be compared
+ * @return true if and only if this progress index is equal to or after the
given progress index
+ */
+ public final boolean isEqualOrAfter(@Nonnull final ProgressIndex
progressIndex) {
+ return
updateToMinimumEqualOrIsAfterProgressIndex(progressIndex).equals(this);
+ }
+
/**
* Define the isEqualOrAfter relation, A.isEqualOrAfter(B) if and only if
each tuple member in A
* is greater than or equal to B in the corresponding total order relation.