This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 44ed8578073 [To dev/1.3] Pipe: improve progress coverage checks
(#17967)
44ed8578073 is described below
commit 44ed8578073028b16cc5706907a96a3df8eb1717
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 18 11:02:44 2026 +0800
[To dev/1.3] Pipe: improve progress coverage checks (#17967)
* Pipe: improve progress coverage checks (#17940)
* Pipe: improve progress coverage checks
* Pipe: address shutdown progress review comments
* Pipe: refine Chinese shutdown progress messages
(cherry picked from commit b33278688c2fc5f8c6f860253204b4e835f16487)
* Fix Java 8 test compatibility
---
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 125 ++++++++++++++--
.../PipeHistoricalDataRegionTsFileSource.java | 88 ++++++------
.../PipeTsFileEpochProgressIndexKeeper.java | 80 +++++++++--
.../iotdb/db/service/DataNodeShutdownHook.java | 9 +-
.../PipeHistoricalDataRegionTsFileSourceTest.java | 123 ++++++++++++++++
.../PipeTsFileEpochProgressIndexKeeperTest.java | 160 +++++++++++++++++++++
.../commons/consensus/index/ProgressIndex.java | 11 ++
7 files changed, 524 insertions(+), 72 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 67b9460c15a..09d3aef90c8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -99,6 +99,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -626,25 +627,125 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent
{
///////////////////////// Shutdown Logic /////////////////////////
+ public long getShutdownProgressPersistTimeoutInMs() {
+ return Math.max(
+ 1_000L,
+ (long)
CommonDescriptor.getInstance().getConfig().getCnConnectionTimeoutInMS()
+ +
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS());
+ }
+
+ public boolean persistAllProgressIndex(final long timeoutInMs) {
+ final long normalizedTimeoutInMs = Math.max(1L, timeoutInMs);
+ final long startTime = System.currentTimeMillis();
+ final AtomicBoolean isConfirmed = new AtomicBoolean(false);
+ final Thread persistThread =
+ new Thread(
+ () -> isConfirmed.set(persistAllProgressIndexInternal()),
+ ThreadName.PIPE_RUNTIME_META_SYNCER.getName() +
"-Shutdown-Persist");
+ persistThread.setDaemon(true);
+
+ LOGGER.info(
+ "Start to persist all pipe progress indexes during shutdown, pipe
count {}, timeout {} ms.",
+ getPipeCount(),
+ normalizedTimeoutInMs);
+ persistThread.start();
+ try {
+ final long deadlineInMs = startTime + normalizedTimeoutInMs;
+ while (persistThread.isAlive()) {
+ final long remainingTimeInMs = deadlineInMs -
System.currentTimeMillis();
+ if (remainingTimeInMs <= 0) {
+ break;
+ }
+ persistThread.join(remainingTimeInMs);
+ }
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.info("Interrupted while persisting all pipe progress indexes
during shutdown.");
+ return false;
+ }
+
+ if (persistThread.isAlive()) {
+ LOGGER.warn(
+ "Timed out while persisting all pipe progress indexes during
shutdown, cost {} ms.",
+ System.currentTimeMillis() - startTime);
+ return false;
+ }
+
+ if (!isConfirmed.get()) {
+ LOGGER.warn(
+ "Failed to persist all pipe progress indexes during shutdown, cost
{} ms.",
+ System.currentTimeMillis() - startTime);
+ }
+ return isConfirmed.get();
+ }
+
public void persistAllProgressIndex() {
- try (final ConfigNodeClient configNodeClient =
-
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
- // Send request to some API server
+ persistAllProgressIndex(getShutdownProgressPersistTimeoutInMs());
+ }
+
+ private boolean persistAllProgressIndexInternal() {
+ final long collectStartTime = System.currentTimeMillis();
+ final int pipeCount = getPipeCount();
+ try {
final TPipeHeartbeatResp resp = new TPipeHeartbeatResp(new
ArrayList<>());
collectPipeMetaList(new TPipeHeartbeatReq(Long.MIN_VALUE), resp);
+ final int pipeMetaCount = resp.getPipeMetaList().size();
+ final int pipeMetaSizeInBytes =
+ resp.getPipeMetaList().stream()
+ .filter(Objects::nonNull)
+ .mapToInt(ByteBuffer::remaining)
+ .sum();
+ LOGGER.info(
+ "Collected pipe metas for shutdown progress persist, pipe count {},
pipe meta count {}, pipe meta size {} bytes, cost {} ms.",
+ pipeCount,
+ pipeMetaCount,
+ pipeMetaSizeInBytes,
+ System.currentTimeMillis() - collectStartTime);
+
if (resp.getPipeMetaList().isEmpty()) {
- return;
+ if (pipeCount != 0) {
+ LOGGER.info(
+ "Collected empty pipe metas during shutdown while pipe count is
{}.", pipeCount);
+ return false;
+ }
+ return true;
}
- final TSStatus result =
- configNodeClient.pushHeartbeat(
- IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
- LOGGER.warn("Failed to persist progress index to configNode, status:
{}", result);
- } else {
- LOGGER.info("Successfully persisted all pipe's info to configNode.");
+
+ try (final ConfigNodeClient configNodeClient =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ LOGGER.info(
+ "Start to push heartbeat shutdown pipe meta to ConfigNode, data
node id {}, pipe count {}, pipe meta count {}, pipe meta size {} bytes.",
+ IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
+ pipeCount,
+ pipeMetaCount,
+ pipeMetaSizeInBytes);
+ final long pushStartTime = System.currentTimeMillis();
+ final TSStatus result =
+ configNodeClient.pushHeartbeat(
+ IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
resp);
+ final long pushCostTime = System.currentTimeMillis() - pushStartTime;
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
+ LOGGER.warn("Failed to persist progress index to ConfigNode, status:
{}", result);
+ LOGGER.warn(
+ "Failed to push heartbeat shutdown pipe meta to ConfigNode,
status {}, cost {} ms.",
+ result,
+ pushCostTime);
+ return false;
+ } else {
+ LOGGER.info(
+ "Successfully finished pushing heartbeat shutdown pipe meta to
ConfigNode, pipe count {}, pipe meta count {}, pipe meta size {} bytes, cost {}
ms.",
+ pipeCount,
+ pipeMetaCount,
+ pipeMetaSizeInBytes,
+ pushCostTime);
+ LOGGER.info("Successfully persisted all pipe's info to ConfigNode.");
+ return true;
+ }
}
} catch (final Exception e) {
- LOGGER.warn(e.getMessage());
+ LOGGER.warn(
+ "Exception occurred while persisting all pipe progress indexes
during shutdown.", e);
+ return false;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
index 54597cb1bcd..09b121bfae8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
@@ -372,53 +372,14 @@ public class PipeHistoricalDataRegionTsFileSource
implements PipeHistoricalDataR
final Collection<TsFileResource> sequenceTsFileResources =
tsFileManager.getTsFileList(true).stream()
.peek(originalResourceList::add)
- .filter(
- resource ->
- isHistoricalSourceEnabled
- &&
- // Some resource is marked as deleted but not
removed from the list.
- !resource.isDeleted()
- // Some resource is generated by pipe. We ignore
them if the pipe should
- // not transfer pipe requests.
- && (!resource.isGeneratedByPipe() ||
isForwardingPipeRequests)
- && (
- // If the tsFile is not already marked closing, it
is not captured by
- // the pipe realtime module. Thus, we can wait for
the realtime sync
- // module to handle this, to avoid blocking the
pipe sync process.
- !resource.isClosed()
- &&
Optional.ofNullable(resource.getProcessor())
-
.map(TsFileProcessor::alreadyMarkedClosing)
- .orElse(true)
- || mayTsFileContainUnprocessedData(resource)
- &&
isTsFileResourceOverlappedWithTimeRange(resource)
- &&
mayTsFileResourceOverlappedWithPattern(resource)))
+ .filter(this::shouldExtractTsFileResource)
.collect(Collectors.toList());
- filteredTsFileResources.addAll(sequenceTsFileResources);
-
final Collection<TsFileResource> unSequenceTsFileResources =
tsFileManager.getTsFileList(false).stream()
.peek(originalResourceList::add)
- .filter(
- resource ->
- isHistoricalSourceEnabled
- &&
- // Some resource is marked as deleted but not
removed from the list.
- !resource.isDeleted()
- // Some resource is generated by pipe. We ignore
them if the pipe should
- // not transfer pipe requests.
- && (!resource.isGeneratedByPipe() ||
isForwardingPipeRequests)
- && (
- // If the tsFile is not already marked closing, it
is not captured by
- // the pipe realtime module. Thus, we can wait for
the realtime sync
- // module to handle this, to avoid blocking the
pipe sync process.
- !resource.isClosed()
- &&
Optional.ofNullable(resource.getProcessor())
-
.map(TsFileProcessor::alreadyMarkedClosing)
- .orElse(true)
- || mayTsFileContainUnprocessedData(resource)
- &&
isTsFileResourceOverlappedWithTimeRange(resource)
- &&
mayTsFileResourceOverlappedWithPattern(resource)))
+ .filter(this::shouldExtractTsFileResource)
.collect(Collectors.toList());
+ filteredTsFileResources.addAll(sequenceTsFileResources);
filteredTsFileResources.addAll(unSequenceTsFileResources);
filteredTsFileResources.removeIf(
@@ -464,6 +425,46 @@ public class PipeHistoricalDataRegionTsFileSource
implements PipeHistoricalDataR
}
}
+ private boolean shouldExtractTsFileResource(final TsFileResource resource) {
+ if (!isHistoricalSourceEnabled) {
+ return false;
+ }
+
+ // Some resource is marked as deleted but not removed from the list.
+ if (resource.isDeleted()) {
+ return false;
+ }
+
+ // Some resource is generated by pipe. We ignore them if the pipe should
not transfer pipe
+ // requests.
+ if (resource.isGeneratedByPipe() && !isForwardingPipeRequests) {
+ return false;
+ }
+
+ // Some resource may not be closed due to the control of
PIPE_MIN_FLUSH_INTERVAL_IN_MS. We
+ // simply ignore them.
+ if (!resource.isClosed()
+ && Optional.ofNullable(resource.getProcessor())
+ .map(TsFileProcessor::alreadyMarkedClosing)
+ .orElse(true)) {
+ return true;
+ }
+
+ if (!mayTsFileContainUnprocessedData(resource)) {
+ return false;
+ }
+
+ if (!isTsFileResourceOverlappedWithTimeRange(resource)) {
+ return false;
+ }
+
+ if (!mayTsFileResourceOverlappedWithPattern(resource)) {
+ return false;
+ }
+
+ return true;
+ }
+
private boolean mayTsFileContainUnprocessedData(final TsFileResource
resource) {
if (startIndex instanceof TimeWindowStateProgressIndex) {
// The resource is closed thus the TsFileResource#getFileEndTime() is
safe to use
@@ -474,8 +475,7 @@ public class PipeHistoricalDataRegionTsFileSource
implements PipeHistoricalDataR
startIndex = ((StateProgressIndex) startIndex).getInnerProgressIndex();
}
- if (!startIndex.isAfter(resource.getMaxProgressIndex())
- && !startIndex.equals(resource.getMaxProgressIndex())) {
+ if (!startIndex.isEqualOrAfter(resource.getMaxProgressIndex())) {
LOGGER.info(
"Pipe {}@{}: file {} meets mayTsFileContainUnprocessedData
condition, extractor progressIndex: {}, resource ProgressIndex: {}",
pipeName,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
index ff7d90c377d..e1ebe178dd0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
@@ -31,40 +31,90 @@ import java.util.concurrent.ConcurrentHashMap;
public class PipeTsFileEpochProgressIndexKeeper {
- // data region id -> pipeName -> tsFile path -> max progress index
+ // data region id -> task scope id -> tsFile path -> max progress index
private final Map<String, Map<String, Map<String, TsFileResource>>>
progressIndexKeeper =
new ConcurrentHashMap<>();
public synchronized void registerProgressIndex(
- final String dataRegionId, final String pipeName, final TsFileResource
resource) {
+ final String dataRegionId, final String taskScopeID, final
TsFileResource resource) {
progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
- .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>())
.putIfAbsent(resource.getTsFilePath(), resource);
}
public synchronized void eliminateProgressIndex(
- final String dataRegionId, final @Nonnull String pipeName, final String
filePath) {
- progressIndexKeeper
- .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
- .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
- .remove(filePath);
+ final String dataRegionId, final @Nonnull String taskScopeID, final
String filePath) {
+ final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+ progressIndexKeeper.get(dataRegionId);
+ if (scopeProgressIndexKeeper == null) {
+ return;
+ }
+
+ final Map<String, TsFileResource> tsFileProgressIndexKeeper =
+ scopeProgressIndexKeeper.get(taskScopeID);
+ if (tsFileProgressIndexKeeper == null) {
+ return;
+ }
+
+ tsFileProgressIndexKeeper.remove(filePath);
+ if (tsFileProgressIndexKeeper.isEmpty()) {
+ scopeProgressIndexKeeper.remove(taskScopeID);
+ if (scopeProgressIndexKeeper.isEmpty()) {
+ progressIndexKeeper.remove(dataRegionId);
+ }
+ }
+ }
+
+ public synchronized void clearProgressIndex(
+ final String dataRegionId, final @Nonnull String taskScopeID) {
+ final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+ progressIndexKeeper.get(dataRegionId);
+ if (scopeProgressIndexKeeper == null) {
+ return;
+ }
+
+ scopeProgressIndexKeeper.remove(taskScopeID);
+ if (scopeProgressIndexKeeper.isEmpty()) {
+ progressIndexKeeper.remove(dataRegionId);
+ }
+ }
+
+ public synchronized boolean containsTsFile(
+ final String dataRegionId, final @Nonnull String taskScopeID, final
String tsFilePath) {
+ final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+ progressIndexKeeper.get(dataRegionId);
+ if (scopeProgressIndexKeeper == null) {
+ return false;
+ }
+
+ final Map<String, TsFileResource> tsFileProgressIndexKeeper =
+ scopeProgressIndexKeeper.get(taskScopeID);
+ return tsFileProgressIndexKeeper != null &&
tsFileProgressIndexKeeper.containsKey(tsFilePath);
}
public synchronized boolean isProgressIndexAfterOrEquals(
final String dataRegionId,
- final String pipeName,
+ final String taskScopeID,
final String tsFilePath,
final ProgressIndex progressIndex) {
- return progressIndexKeeper
- .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
- .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
- .entrySet()
- .stream()
+ final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+ progressIndexKeeper.get(dataRegionId);
+ if (scopeProgressIndexKeeper == null) {
+ return false;
+ }
+
+ final Map<String, TsFileResource> tsFileProgressIndexKeeper =
+ scopeProgressIndexKeeper.get(taskScopeID);
+ if (tsFileProgressIndexKeeper == null) {
+ return false;
+ }
+
+ return tsFileProgressIndexKeeper.entrySet().stream()
.filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
.map(Entry::getValue)
.filter(Objects::nonNull)
- .anyMatch(resource ->
!resource.getMaxProgressIndex().isAfter(progressIndex));
+ .anyMatch(resource ->
progressIndex.isEqualOrAfter(resource.getMaxProgressIndex()));
}
//////////////////////////// singleton ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 4d0f4a5a412..76966820abb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -148,7 +148,14 @@ public class DataNodeShutdownHook extends Thread {
}
}
// Persist progress index before shutdown to accurate recovery after
restart
- PipeDataNodeAgent.task().persistAllProgressIndex();
+ final long shutdownProgressPersistTimeoutInMs =
+ PipeDataNodeAgent.task().getShutdownProgressPersistTimeoutInMs();
+ logger.info(
+ "Persisting pipe progress indexes before shutdown, timeout {} ms.",
+ shutdownProgressPersistTimeoutInMs);
+ if
(!PipeDataNodeAgent.task().persistAllProgressIndex(shutdownProgressPersistTimeoutInMs))
{
+ logger.warn("Pipe progress indexes were not confirmed during shutdown.");
+ }
// Actually stop all services started by the DataNode.
// If we don't call this, services like the RestService are not stopped
and I can't re-start
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSourceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSourceTest.java
new file mode 100644
index 00000000000..3b1396de5c1
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSourceTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.historical;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+
+public class PipeHistoricalDataRegionTsFileSourceTest {
+
+ @Test
+ public void testMayTsFileContainUnprocessedDataUsesEqualOrAfterCoverage()
throws Exception {
+ final File tempDir =
Files.createTempDirectory("pipeHistoricalProgressCoverage").toFile();
+
+ try {
+ assertMayTsFileContainUnprocessedData(
+ tempDir,
+ "superset.tsfile",
+ hybridProgressIndex(
+ new IoTProgressIndex(ImmutableMap.of(1, 100L, 2, 200L)),
+ new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10))),
+ hybridProgressIndex(
+ new IoTProgressIndex(1, 100L),
+ new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 9))),
+ false);
+
+ assertMayTsFileContainUnprocessedData(
+ tempDir,
+ "missing-dimension.tsfile",
+ hybridProgressIndex(new IoTProgressIndex(1, 100L)),
+ hybridProgressIndex(
+ new IoTProgressIndex(1, 90L),
+ new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10))),
+ true);
+ } finally {
+ FileUtils.deleteFileOrDirectory(tempDir);
+ }
+ }
+
+ private static void assertMayTsFileContainUnprocessedData(
+ final File tempDir,
+ final String fileName,
+ final ProgressIndex startIndex,
+ final ProgressIndex resourceProgressIndex,
+ final boolean expected)
+ throws Exception {
+ Assert.assertEquals(!expected,
startIndex.isEqualOrAfter(resourceProgressIndex));
+
+ final PipeHistoricalDataRegionTsFileSource source = new
PipeHistoricalDataRegionTsFileSource();
+ setPrivateField(source, "pipeName", "pipe");
+ setPrivateField(source, "dataRegionId", 1);
+ setPrivateField(source, "startIndex", startIndex);
+
+ final Method method =
+ PipeHistoricalDataRegionTsFileSource.class.getDeclaredMethod(
+ "mayTsFileContainUnprocessedData", TsFileResource.class);
+ method.setAccessible(true);
+ Assert.assertEquals(
+ expected,
+ method.invoke(
+ source, createClosedTsFileResource(tempDir, fileName,
resourceProgressIndex)));
+ }
+
+ private static TsFileResource createClosedTsFileResource(
+ final File tempDir, final String fileName, final ProgressIndex
progressIndex)
+ throws Exception {
+ final File file = new File(tempDir, fileName);
+ Assert.assertTrue(file.createNewFile());
+
+ final TsFileResource resource = new TsFileResource(file);
+ resource.setStatusForTest(TsFileResourceStatus.NORMAL);
+ resource.updateProgressIndex(progressIndex);
+ return resource;
+ }
+
+ private static ProgressIndex hybridProgressIndex(
+ final ProgressIndex firstProgressIndex, final ProgressIndex...
progressIndexes) {
+ ProgressIndex result = new HybridProgressIndex(firstProgressIndex);
+ for (final ProgressIndex progressIndex : progressIndexes) {
+ result =
result.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex);
+ }
+ return result;
+ }
+
+ private static void setPrivateField(
+ final PipeHistoricalDataRegionTsFileSource source, final String
fieldName, final Object value)
+ throws ReflectiveOperationException {
+ final Field field =
PipeHistoricalDataRegionTsFileSource.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(source, value);
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
new file mode 100644
index 00000000000..bfd45b708b6
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+public class PipeTsFileEpochProgressIndexKeeperTest {
+
+ private static final String DATA_REGION_ID = "1";
+ private static final String TASK_SCOPE_A = "task-scope-a";
+ private static final String TASK_SCOPE_B = "task-scope-b";
+
+ private final PipeTsFileEpochProgressIndexKeeper keeper =
+ PipeTsFileEpochProgressIndexKeeper.getInstance();
+
+ private File tempDir;
+
+ @Before
+ public void setUp() throws IOException {
+ tempDir =
Files.createTempDirectory("pipeTsFileEpochProgressIndexKeeper").toFile();
+ }
+
+ @After
+ public void tearDown() {
+ keeper.clearProgressIndex(DATA_REGION_ID, TASK_SCOPE_A);
+ keeper.clearProgressIndex(DATA_REGION_ID, TASK_SCOPE_B);
+ FileUtils.deleteFileOrDirectory(tempDir);
+ }
+
+ @Test
+ public void testDuplicateTsFileLookupIsScopedByTaskInstance() throws
IOException {
+ final TsFileResource resource = createTsFileResource("shared.tsfile", 1L);
+
+ keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A, resource);
+
+ Assert.assertTrue(
+ keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_A,
resource.getTsFilePath()));
+ Assert.assertFalse(
+ keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_B,
resource.getTsFilePath()));
+ }
+
+ @Test
+ public void testProgressIndexCheckDoesNotLeakAcrossTaskScopes() throws
IOException {
+ keeper.registerProgressIndex(
+ DATA_REGION_ID, TASK_SCOPE_A, createTsFileResource("1-1-0-0.tsfile",
1L));
+
+ final TsFileResource comparedResource =
createTsFileResource("1-2-0-0.tsfile", 2L);
+ keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A,
comparedResource);
+
+ Assert.assertTrue(
+ keeper.isProgressIndexAfterOrEquals(
+ DATA_REGION_ID,
+ TASK_SCOPE_A,
+ comparedResource.getTsFilePath(),
+ new SimpleProgressIndex(1, 2L)));
+ Assert.assertFalse(
+ keeper.isProgressIndexAfterOrEquals(
+ DATA_REGION_ID,
+ TASK_SCOPE_B,
+ comparedResource.getTsFilePath(),
+ new SimpleProgressIndex(1, 2L)));
+ }
+
+ @Test
+ public void testProgressIndexCheckUsesEqualOrAfterCoverage() throws
IOException {
+ final ProgressIndex registeredProgressIndex =
+ hybridProgressIndex(
+ new IoTProgressIndex(1, 90L),
+ new RecoverProgressIndex(-1, new SimpleProgressIndex(0, 10)));
+ keeper.registerProgressIndex(
+ DATA_REGION_ID,
+ TASK_SCOPE_A,
+ createTsFileResource("registered-hybrid.tsfile",
registeredProgressIndex));
+
+ Assert.assertFalse(
+ keeper.isProgressIndexAfterOrEquals(
+ DATA_REGION_ID, TASK_SCOPE_A, "current.tsfile", new
IoTProgressIndex(1, 100L)));
+
+ Assert.assertTrue(
+ keeper.isProgressIndexAfterOrEquals(
+ DATA_REGION_ID,
+ TASK_SCOPE_A,
+ "current.tsfile",
+ hybridProgressIndex(
+ new IoTProgressIndex(1, 100L),
+ new RecoverProgressIndex(-1, new SimpleProgressIndex(0,
10)))));
+ }
+
+ @Test
+ public void testClearProgressIndexOnlyRemovesTargetTaskScope() throws
IOException {
+ final TsFileResource scopeAResource =
createTsFileResource("scope-a.tsfile", 1L);
+ final TsFileResource scopeBResource =
createTsFileResource("scope-b.tsfile", 1L);
+
+ keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A, scopeAResource);
+ keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_B, scopeBResource);
+
+ keeper.clearProgressIndex(DATA_REGION_ID, TASK_SCOPE_A);
+
+ Assert.assertFalse(
+ keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_A,
scopeAResource.getTsFilePath()));
+ Assert.assertTrue(
+ keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_B,
scopeBResource.getTsFilePath()));
+ }
+
+ private TsFileResource createTsFileResource(final String fileName, final
long flushOrderId)
+ throws IOException {
+ return createTsFileResource(fileName, new SimpleProgressIndex(1,
flushOrderId));
+ }
+
+ private TsFileResource createTsFileResource(
+ final String fileName, final ProgressIndex progressIndex) throws
IOException {
+ final File file = new File(tempDir, fileName);
+ Assert.assertTrue(file.createNewFile());
+
+ final TsFileResource resource = new TsFileResource(file);
+ resource.updateProgressIndex(progressIndex);
+ return resource;
+ }
+
+ private ProgressIndex hybridProgressIndex(
+ final ProgressIndex firstProgressIndex, final ProgressIndex...
progressIndexes) {
+ ProgressIndex result = new HybridProgressIndex(firstProgressIndex);
+ for (final ProgressIndex progressIndex : progressIndexes) {
+ result =
result.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex);
+ }
+ return result;
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
index 3c8d13bab54..979eee0c8db 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -116,6 +116,17 @@ public abstract class ProgressIndex implements Accountable
{
return super.hashCode();
}
+ /**
+ * A.isEqualOrAfter(B) is true if and only if A already covers B in every
tuple member. In other
+ * words, blending B into A does not advance A.
+ *
+ * @param progressIndex the progress index to be compared
+ * @return true if and only if this progress index is equal to or after the
given progress index
+ */
+ public final boolean isEqualOrAfter(@Nonnull final ProgressIndex
progressIndex) {
+ return
updateToMinimumEqualOrIsAfterProgressIndex(progressIndex).equals(this);
+ }
+
/**
* Define the isEqualOrAfter relation, A.isEqualOrAfter(B) if and only if
each tuple member in A
* is greater than or equal to B in the corresponding total order relation.