This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 061d2de0285 Pipe:Fix DataNodeShutdownHook waiting report logic and add
capture history file log (#15952)
061d2de0285 is described below
commit 061d2de02851ca9332325dbdf59d10883b6c3f6e
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Jul 16 14:35:22 2025 +0800
Pipe:Fix DataNodeShutdownHook waiting report logic and add capture history
file log (#15952)
* Pipe:Fix DataNodeShutdownHook waiting report logic and add capture
history file log
* fix
---
...oricalDataRegionTsFileAndDeletionExtractor.java | 22 ++++++++++++++++------
.../iotdb/db/service/DataNodeShutdownHook.java | 2 +-
2 files changed, 17 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
index e18c2907f52..f752b2c50c8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
@@ -703,13 +703,23 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
// instead of replication or something else.
ProgressIndex dedicatedProgressIndex =
tryToExtractLocalProgressIndexForIoTV2(resource.getMaxProgressIndexAfterClose());
- return greaterThanStartIndex(dedicatedProgressIndex);
+ return greaterThanStartIndex(resource, dedicatedProgressIndex);
}
- return greaterThanStartIndex(resource.getMaxProgressIndexAfterClose());
+ return greaterThanStartIndex(resource,
resource.getMaxProgressIndexAfterClose());
}
- private boolean greaterThanStartIndex(ProgressIndex progressIndex) {
- return !startIndex.isAfter(progressIndex) &&
!startIndex.equals(progressIndex);
+ private boolean greaterThanStartIndex(PersistentResource resource,
ProgressIndex progressIndex) {
+ if (!startIndex.isAfter(progressIndex) &&
!startIndex.equals(progressIndex)) {
+ LOGGER.info(
+ "Pipe {}@{}: resource {} meets mayTsFileContainUnprocessedData
condition, extractor progressIndex: {}, resource ProgressIndex: {}",
+ pipeName,
+ dataRegionId,
+ resource,
+ startIndex,
+ progressIndex);
+ return true;
+ }
+ return false;
}
private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource
resource) {
@@ -809,7 +819,7 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
toBeCompared =
tryToExtractLocalProgressIndexForIoTV2(toBeCompared);
}
- return !greaterThanStartIndex(toBeCompared);
+ return !greaterThanStartIndex(resource, toBeCompared);
})
.forEach(DeletionResource::decreaseReference);
// Get deletions that should be sent.
@@ -821,7 +831,7 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
if
(pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
toBeCompared =
tryToExtractLocalProgressIndexForIoTV2(toBeCompared);
}
- return greaterThanStartIndex(toBeCompared);
+ return greaterThanStartIndex(resource, toBeCompared);
})
.collect(Collectors.toList());
resourceList.addAll(allDeletionResources);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 9c31923b7bc..e9b125818fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -128,7 +128,7 @@ public class DataNodeShutdownHook extends Thread {
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.entrySet())
{
boolean timeout = false;
while (true) {
- if (entry.getValue().getRemainingNonHeartbeatEvents() > 0) {
+ if (entry.getValue().getRemainingNonHeartbeatEvents() == 0) {
logger.info(
"Successfully waited for pipe {} to finish.",
entry.getValue().getPipeName());
break;