This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new eb1e786fa15 Revert Pipe/IoTV2: Try to persist progressIndex in local
for resend event more efficently (#15924)
eb1e786fa15 is described below
commit eb1e786fa159132ad2daec4fe960a81595573d27
Author: Caideyipi <[email protected]>
AuthorDate: Sat Jul 12 16:27:32 2025 +0800
Revert Pipe/IoTV2: Try to persist progressIndex in local for resend event
more efficently (#15924)
* Revert
* Continue revert
---
.../runtime/heartbeat/PipeHeartbeatParser.java | 2 -
.../confignode/persistence/pipe/PipeTaskInfo.java | 7 +-
.../impl/pipe/runtime/PipeMetaSyncProcedure.java | 4 +-
.../impl/pipe/task/AlterPipeProcedureV2.java | 16 +--
.../impl/pipe/task/CreatePipeProcedureV2.java | 18 +--
.../confignode/service/ConfigNodeShutdownHook.java | 3 -
.../request/ConfigPhysicalPlanSerDeTest.java | 12 +-
.../consensus/response/pipe/PipeTableRespTest.java | 6 +-
.../agent/PipeConfigNodeSubtaskExecutorTest.java | 3 +-
.../iotdb/confignode/persistence/PipeInfoTest.java | 4 +-
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 27 ----
...oricalDataRegionTsFileAndDeletionExtractor.java | 9 +-
.../iotdb/db/service/DataNodeShutdownHook.java | 3 -
.../iotdb/commons/concurrent/ThreadName.java | 5 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 55 --------
.../agent/runtime/PipePeriodicalJobExecutor.java | 33 -----
.../commons/pipe/agent/task/PipeTaskAgent.java | 3 -
.../commons/pipe/agent/task/meta/PipeMeta.java | 6 +-
.../pipe/agent/task/meta/PipeRuntimeMeta.java | 51 ++------
.../commons/pipe/agent/task/meta/PipeTaskMeta.java | 142 +--------------------
.../iotdb/commons/pipe/config/PipeConfig.java | 21 ---
.../iotdb/commons/pipe/config/PipeDescriptor.java | 18 ---
.../iotdb/commons/pipe/task/PipeMetaDeSerTest.java | 22 +---
23 files changed, 52 insertions(+), 418 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 6f6c0b2d443..e2303fecdea 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -244,7 +244,6 @@ public class PipeHeartbeatParser {
.equals(PipeStatus.STOPPED)) {
PipeRuntimeMeta runtimeMeta =
pipeMetaFromCoordinator.getRuntimeMeta();
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
- runtimeMeta.onSetPipeDroppedOrStopped();
runtimeMeta.setIsStoppedByRuntimeException(true);
needWriteConsensusOnConfigNodes.set(true);
@@ -274,7 +273,6 @@ public class PipeHeartbeatParser {
exceptionMap.put(nodeId, exception);
}
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
- runtimeMeta.onSetPipeDroppedOrStopped();
runtimeMeta.setIsStoppedByRuntimeException(true);
needWriteConsensusOnConfigNodes.set(true);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index b014013bee8..30333c66768 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -627,11 +627,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
if (newLeader != -1) {
consensusGroupIdToTaskMetaMap.put(
consensusGroupId.getId(),
- new PipeTaskMeta(
- MinimumProgressIndex.INSTANCE,
- newLeader,
- consensusGroupId.getId(),
- false));
+ new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader));
}
// else:
// "The pipe task meta does not contain the data
region group {} or
@@ -805,7 +801,6 @@ public class PipeTaskInfo implements SnapshotProcessor {
// Mark the status of the pipe with exception as stopped
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
- runtimeMeta.onSetPipeDroppedOrStopped();
runtimeMeta.setIsStoppedByRuntimeException(true);
final Map<Integer, PipeRuntimeException> exceptionMap =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 07ce978c16b..43b1660681e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -158,9 +158,7 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
consensusGroupIdToTaskMetaMap.get(taskIndex).setLeaderNodeId(newLeader);
} else {
consensusGroupIdToTaskMetaMap.put(
- taskIndex,
- new PipeTaskMeta(
- MinimumProgressIndex.INSTANCE, newLeader,
taskIndex, false));
+ taskIndex, new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader));
}
});
final Set<Integer> taskIdToRemove =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 259b865e59f..a1305e99d28 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -167,11 +167,7 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
(taskId, pipeTaskMeta) -> {
updatedConsensusGroupIdToTaskMetaMap.put(
taskId,
- new PipeTaskMeta(
- pipeTaskMeta.getProgressIndex(),
- pipeTaskMeta.getLeaderNodeId(),
- taskId,
- false));
+ new PipeTaskMeta(pipeTaskMeta.getProgressIndex(),
pipeTaskMeta.getLeaderNodeId()));
});
} else {
// data regions & schema regions
@@ -192,11 +188,7 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
// Pipe only collect user's data, filter metric database
here.
updatedConsensusGroupIdToTaskMetaMap.put(
regionGroupId.getId(),
- new PipeTaskMeta(
- currentPipeTaskMeta.getProgressIndex(),
- regionLeaderNodeId,
- regionGroupId.getId(),
- false));
+ new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(),
regionLeaderNodeId));
}
});
@@ -212,9 +204,7 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
new PipeTaskMeta(
configRegionTaskMeta.getProgressIndex(),
// The leader of the config region is the config node itself
- ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
- configRegionTaskMeta.getProgressIndex().hashCode(),
- false));
+
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index b557f4c7aef..130d964f240 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -272,9 +272,7 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
groupId.getId(),
new PipeTaskMeta(
new RecoverProgressIndex(senderDataNodeId, new
SimpleProgressIndex(0, 0)),
- senderDataNodeId,
- groupId.getId(),
- false));
+ senderDataNodeId));
} else if (pipeStaticMeta.isSourceExternal()) {
// external source
final PipeExternalSourceLoadBalancer loadBalancer =
@@ -298,9 +296,7 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
.forEach(
(taskIndex, leaderNodeId) -> {
consensusGroupIdToTaskMetaMap.put(
- taskIndex,
- new PipeTaskMeta(
- MinimumProgressIndex.INSTANCE, leaderNodeId,
taskIndex, false));
+ taskIndex, new PipeTaskMeta(MinimumProgressIndex.INSTANCE,
leaderNodeId));
});
} else {
// data regions & schema regions
@@ -317,11 +313,7 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
// Pipe only collect user's data, filter out metric database
here.
consensusGroupIdToTaskMetaMap.put(
regionGroupId.getId(),
- new PipeTaskMeta(
- MinimumProgressIndex.INSTANCE,
- regionLeaderNodeId,
- regionGroupId.getId(),
- false));
+ new PipeTaskMeta(MinimumProgressIndex.INSTANCE,
regionLeaderNodeId));
}
});
@@ -333,9 +325,7 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
new PipeTaskMeta(
MinimumProgressIndex.INSTANCE,
// The leader of the config region is the config node itself
- ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
- Integer.MIN_VALUE,
- false));
+ ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
}
pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
index bd12adbd804..5c3ec5af063 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
@@ -88,8 +87,6 @@ public class ConfigNodeShutdownHook extends Thread {
"Reporting ConfigNode shutdown failed. The cluster will still take
the current ConfigNode as Running for a few seconds.");
}
}
- // Shutdown pipe progressIndex background service
- PipePeriodicalJobExecutor.shutdownBackgroundService();
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index a4403a88ec8..1505640ff90 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -886,7 +886,7 @@ public class ConfigPhysicalPlanSerDeTest {
extractorAttributes.put("extractor",
"org.apache.iotdb.pipe.extractor.DefaultExtractor");
processorAttributes.put("processor",
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
connectorAttributes.put("connector",
"org.apache.iotdb.pipe.protocol.ThriftTransporter");
- final PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+ final PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
pipeTasks.put(1, pipeTaskMeta);
final PipeStaticMeta pipeStaticMeta =
@@ -911,7 +911,7 @@ public class ConfigPhysicalPlanSerDeTest {
extractorAttributes.put("pattern", "root.db");
processorAttributes.put("processor", "do-nothing-processor");
connectorAttributes.put("batch.enable", "false");
- final PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+ final PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new
ConcurrentHashMap<>();
pipeTasks.put(1, pipeTaskMeta);
final PipeStaticMeta pipeStaticMeta =
@@ -949,7 +949,7 @@ public class ConfigPhysicalPlanSerDeTest {
@Test
public void OperateMultiplePipesPlanV2Test() throws IOException {
- final PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+ final PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new
ConcurrentHashMap<>();
pipeTasks.put(1, pipeTaskMeta);
final PipeStaticMeta pipeStaticMeta =
@@ -962,7 +962,7 @@ public class ConfigPhysicalPlanSerDeTest {
final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
final CreatePipePlanV2 createPipePlanV2 = new
CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta);
- final PipeTaskMeta pipeTaskMeta1 = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2, 2, false);
+ final PipeTaskMeta pipeTaskMeta1 = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2);
final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks1 = new
ConcurrentHashMap<>();
pipeTasks.put(2, pipeTaskMeta1);
final PipeStaticMeta pipeStaticMeta1 =
@@ -1061,8 +1061,8 @@ public class ConfigPhysicalPlanSerDeTest {
new PipeRuntimeMeta(
new ConcurrentHashMap<Integer, PipeTaskMeta>() {
{
- put(456, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 987, 1,
false));
- put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 789,
1, false));
+ put(456, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 987));
+ put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 789));
}
});
pipeMetaList.add(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta));
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
index 04dea675015..94189a19d99 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
@@ -54,7 +54,7 @@ public class PipeTableRespTest {
connectorAttributes.put("host", "127.0.0.1");
connectorAttributes.put("port", "6667");
- PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+ PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
pipeTasks.put(1, pipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
@@ -74,7 +74,7 @@ public class PipeTableRespTest {
connectorAttributes1.put("host", "127.0.0.1");
connectorAttributes1.put("port", "6667");
- PipeTaskMeta pipeTaskMeta1 = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+ PipeTaskMeta pipeTaskMeta1 = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks1 = new
ConcurrentHashMap<>();
pipeTasks1.put(1, pipeTaskMeta1);
PipeStaticMeta pipeStaticMeta1 =
@@ -94,7 +94,7 @@ public class PipeTableRespTest {
connectorAttributes2.put("host", "172.30.30.30");
connectorAttributes2.put("port", "6667");
- PipeTaskMeta pipeTaskMeta2 = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+ PipeTaskMeta pipeTaskMeta2 = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks2 = new
ConcurrentHashMap<>();
pipeTasks2.put(1, pipeTaskMeta2);
PipeStaticMeta pipeStaticMeta2 =
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
index a43a87b120e..f2fa5b0205a 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
@@ -62,8 +62,7 @@ public class PipeConfigNodeSubtaskExecutorTest {
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName());
}
},
- new PipeTaskMeta(
- MinimumProgressIndex.INSTANCE, Integer.MIN_VALUE,
Integer.MIN_VALUE, false)));
+ new PipeTaskMeta(MinimumProgressIndex.INSTANCE,
Integer.MIN_VALUE)));
}
@After
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index 815d5c1757e..c3e7916108f 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -86,7 +86,7 @@ public class PipeInfoTest {
connectorAttributes.put("host", "127.0.0.1");
connectorAttributes.put("port", "6667");
- PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+ PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
pipeTasks.put(1, pipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
@@ -121,7 +121,7 @@ public class PipeInfoTest {
extractorAttributes.put("extractor",
"org.apache.iotdb.pipe.extractor.DefaultExtractor");
processorAttributes.put("processor",
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
connectorAttributes.put("connector",
"org.apache.iotdb.pipe.protocol.ThriftTransporter");
- PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+ PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
pipeTasks.put(1, pipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index adee9d1774b..ab78200f688 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -848,11 +848,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
///////////////////////// Shutdown Logic /////////////////////////
public void persistAllProgressIndex() {
- persistAllProgressIndexLocally();
- persistAllProgressIndex2ConfigNode();
- }
-
- private void persistAllProgressIndex2ConfigNode() {
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
// Send request to some API server
@@ -871,28 +866,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
}
- private void persistAllProgressIndexLocally() {
- if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
- LOGGER.info(
- "Pipe progress index persist disabled. Skipping persist all progress
index locally.");
- return;
- }
- if (!tryReadLockWithTimeOut(10)) {
- LOGGER.info("Failed to persist all progress index locally because of
timeout.");
- return;
- }
- try {
- for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
- pipeMeta.getRuntimeMeta().persistProgressIndex();
- }
- LOGGER.info("Persist all progress index locally successfully.");
- } catch (final Exception e) {
- LOGGER.warn("Failed to record all progress index locally, because {}.",
e.getMessage(), e);
- } finally {
- releaseReadLock();
- }
- }
-
///////////////////////// Pipe Consensus /////////////////////////
public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final
int consensusGroupId) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
index 707c0fef63d..e18c2907f52 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
@@ -310,12 +310,11 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
pipeName = environment.getPipeName();
creationTime = environment.getCreationTime();
pipeTaskMeta = environment.getPipeTaskMeta();
-
- // progressIndex is immutable in
`updateToMinimumEqualOrIsAfterProgressIndex`, so data
- // consistency in `environment.getPipeTaskMeta().getProgressIndex()` is
ensured.
- startIndex = environment.getPipeTaskMeta().restoreProgressIndex();
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
- startIndex = tryToExtractLocalProgressIndexForIoTV2(startIndex);
+ startIndex =
+
tryToExtractLocalProgressIndexForIoTV2(environment.getPipeTaskMeta().getProgressIndex());
+ } else {
+ startIndex = environment.getPipeTaskMeta().getProgressIndex();
}
dataRegionId = environment.getRegionId();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 11faf429732..9c31923b7bc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -24,7 +24,6 @@ import
org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.exception.ConsensusException;
@@ -156,8 +155,6 @@ public class DataNodeShutdownHook extends Thread {
PipeDataNodeAgent.task().persistAllProgressIndex();
// Shutdown all consensus pipe's receiver
PipeDataNodeAgent.receiver().pipeConsensus().closeReceiverExecutor();
- // Shutdown pipe progressIndex background service
- PipePeriodicalJobExecutor.shutdownBackgroundService();
// Actually stop all services started by the DataNode.
// If we don't call this, services like the RestService are not stopped
and I can't re-start
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index cd54414af6d..95a880be3ff 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -149,7 +149,6 @@ public enum ThreadName {
PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"),
-
PIPE_PROGRESS_INDEX_BACKGROUND_SERVICE("Pipe-Progress-Index-Background-Service"),
LOAD_DATATYPE_CONVERT_POOL("Load-Datatype-Convert-Pool"),
SUBSCRIPTION_EXECUTOR_POOL("Subscription-Executor-Pool"),
SUBSCRIPTION_RUNTIME_META_SYNCER("Subscription-Runtime-Meta-Syncer"),
@@ -271,8 +270,7 @@ public enum ThreadName {
PIPE_CONSENSUS_RPC_PROCESSOR,
ASYNC_DATANODE_PIPE_CONSENSUS_CLIENT_POOL,
PIPE_CONSENSUS_DELETION_SERIALIZE,
- PIPE_CONSENSUS_TSFILE_WRITER_CHECKER,
- PIPE_CONSENSUS_BACKGROUND_TASK_EXECUTOR));
+ PIPE_CONSENSUS_TSFILE_WRITER_CHECKER));
private static final Set<ThreadName> ratisThreadNames =
new HashSet<>(
@@ -309,7 +307,6 @@ public enum ThreadName {
PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
PIPE_RECEIVER_AIR_GAP_AGENT,
PIPE_AIR_GAP_RECEIVER,
- PIPE_PROGRESS_INDEX_BACKGROUND_SERVICE,
SUBSCRIPTION_EXECUTOR_POOL,
SUBSCRIPTION_RUNTIME_META_SYNCER,
WINDOW_EVALUATION_SERVICE,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 42970e1895c..b7bbac6a28c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -195,8 +195,6 @@ public class CommonConfig {
private String pipeHardlinkTsFileDirName = "tsfile";
- private String pipeProgressIndexPersistDirName = "progress";
-
private String pipeHardlinkWALDirName = "wal";
private boolean pipeHardLinkWALEnabled = false;
@@ -265,9 +263,6 @@ public class CommonConfig {
private long pipeMetaSyncerSyncIntervalMinutes = 3;
private long pipeMetaSyncerAutoRestartPipeCheckIntervalRound = 1;
private boolean pipeAutoRestartEnabled = true;
- private boolean pipeProgressIndexPersistEnabled = true;
- private long pipeProgressIndexPersistCheckPointGap = 20;
- private long pipeProgressIndexFlushIntervalMs = 20 * 1000L;
private boolean pipeAirGapReceiverEnabled = false;
private int pipeAirGapReceiverPort = 9780;
@@ -757,18 +752,6 @@ public class CommonConfig {
logger.info("pipeHardlinkTsFileDirName is set to {}.", pipeTsFileDirName);
}
- public String getPipeProgressIndexPersistDirName() {
- return pipeProgressIndexPersistDirName;
- }
-
- public void setPipeProgressIndexPersistDirName(String
pipeProgressIndexPersistDirName) {
- if (Objects.equals(this.pipeProgressIndexPersistDirName,
pipeProgressIndexPersistDirName)) {
- return;
- }
- this.pipeProgressIndexPersistDirName = pipeProgressIndexPersistDirName;
- logger.info("pipeProgressIndexPersistDir is set to {}.",
pipeProgressIndexPersistDirName);
- }
-
public String getPipeHardlinkWALDirName() {
return pipeHardlinkWALDirName;
}
@@ -1227,44 +1210,6 @@ public class CommonConfig {
logger.info("pipeAutoRestartEnabled is set to {}.",
pipeAutoRestartEnabled);
}
- public boolean isPipeProgressIndexPersistEnabled() {
- return pipeProgressIndexPersistEnabled;
- }
-
- public void setPipeProgressIndexPersistEnabled(boolean
pipeProgressIndexPersistEnabled) {
- if (this.pipeProgressIndexPersistEnabled ==
pipeProgressIndexPersistEnabled) {
- return;
- }
- this.pipeProgressIndexPersistEnabled = pipeProgressIndexPersistEnabled;
- logger.info("pipeProgressIndexPersistEnabled is set to {}.",
pipeProgressIndexPersistEnabled);
- }
-
- public long getPipeProgressIndexPersistCheckPointGap() {
- return pipeProgressIndexPersistCheckPointGap;
- }
-
- public void setPipeProgressIndexPersistCheckPointGap(long
pipeProgressIndexPersistCheckPointGap) {
- if (this.pipeProgressIndexPersistCheckPointGap ==
pipeProgressIndexPersistCheckPointGap) {
- return;
- }
- this.pipeProgressIndexPersistCheckPointGap =
pipeProgressIndexPersistCheckPointGap;
- logger.info(
- "pipeProgressIndexPersistCheckPointGap is set to {}.",
- pipeProgressIndexPersistCheckPointGap);
- }
-
- public long getPipeProgressIndexFlushIntervalMs() {
- return pipeProgressIndexFlushIntervalMs;
- }
-
- public void setPipeProgressIndexFlushIntervalMs(long
pipeProgressIndexFlushIntervalMs) {
- if (this.pipeProgressIndexFlushIntervalMs ==
pipeProgressIndexFlushIntervalMs) {
- return;
- }
- this.pipeProgressIndexFlushIntervalMs = pipeProgressIndexFlushIntervalMs;
- logger.info("pipeProgressIndexFlushIntervalMs is set to {}.",
pipeProgressIndexFlushIntervalMs);
- }
-
public long getPipeConnectorRetryIntervalMs() {
return pipeConnectorRetryIntervalMs;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
index 33ac03c5c96..3226b3947f0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
@@ -21,16 +21,8 @@ package org.apache.iotdb.commons.pipe.agent.runtime;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
/**
* The shortest scheduling cycle for these jobs is {@link
* PipeConfig#getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()},
suitable for jobs that are
@@ -38,31 +30,6 @@ import java.util.concurrent.TimeUnit;
*/
public class PipePeriodicalJobExecutor extends
AbstractPipePeriodicalJobExecutor {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipePeriodicalJobExecutor.class);
- // This background service is used to execute jobs that need to be cancelled
and released.
- private static final ScheduledExecutorService backgroundService =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
- ThreadName.PIPE_PROGRESS_INDEX_BACKGROUND_SERVICE.getName());
-
- public static Future<?> submitBackgroundJob(
- Runnable job, long initialDelayInMs, long periodInMs) {
- return ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- backgroundService, job, initialDelayInMs, periodInMs,
TimeUnit.MILLISECONDS);
- }
-
- public static void shutdownBackgroundService() {
- backgroundService.shutdownNow();
- try {
- if (!backgroundService.awaitTermination(30, TimeUnit.SECONDS)) {
- LOGGER.warn("Pipe progressIndex background service did not terminate
within {}s", 30);
- }
- } catch (InterruptedException e) {
- LOGGER.warn(
- "Pipe progressIndex background service is interrupted while waiting
for termination");
- Thread.currentThread().interrupt();
- }
- }
-
public PipePeriodicalJobExecutor() {
super(
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index c61f4dc95fd..cf5bd4f0eb9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -522,7 +522,6 @@ public abstract class PipeTaskAgent {
// but the pipe task meta has not been cleaned up (in case of failure when
executing
// dropPipeTaskByConsensusGroup).
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
- existedPipeMeta.getRuntimeMeta().onSetPipeDroppedOrStopped();
// Drop pipe tasks
final Map<Integer, PipeTask> pipeTasks =
@@ -564,7 +563,6 @@ public abstract class PipeTaskAgent {
// but the pipe task meta has not been cleaned up (in case of failure when
executing
// dropPipeTaskByConsensusGroup).
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
- existedPipeMeta.getRuntimeMeta().onSetPipeDroppedOrStopped();
// Drop pipe tasks
final Map<Integer, PipeTask> pipeTasks =
@@ -657,7 +655,6 @@ public abstract class PipeTaskAgent {
// Set pipe meta status to STOPPED
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
- existedPipeMeta.getRuntimeMeta().onSetPipeDroppedOrStopped();
}
////////////////////////// Checker //////////////////////////
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
index c71156a234b..997278010e9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.commons.pipe.agent.task.meta;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-
import org.apache.tsfile.utils.PublicBAOS;
import java.io.DataOutputStream;
@@ -83,9 +81,7 @@ public class PipeMeta {
public static PipeMeta deserialize4TaskAgent(final ByteBuffer byteBuffer) {
final PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(byteBuffer);
- final PipeRuntimeMeta runtimeMeta =
- PipeRuntimeMeta.deserialize(
- byteBuffer,
PipeConfig.getInstance().isPipeProgressIndexPersistEnabled());
+ final PipeRuntimeMeta runtimeMeta =
PipeRuntimeMeta.deserialize(byteBuffer);
return new PipeMeta(
staticMeta,
runtimeMeta,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
index ff77564bd2f..e4beaf20bbf 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
@@ -110,10 +110,6 @@ public class PipeRuntimeMeta {
return status;
}
- public void onSetPipeDroppedOrStopped() {
-
consensusGroupId2TaskMetaMap.values().forEach(PipeTaskMeta::cancelPersistProgressIndexFuture);
- }
-
public ConcurrentMap<Integer, PipeTaskMeta>
getConsensusGroupId2TaskMetaMap() {
return consensusGroupId2TaskMetaMap;
}
@@ -140,15 +136,6 @@ public class PipeRuntimeMeta {
this.isStoppedByRuntimeException.set(isStoppedByRuntimeException);
}
- public void persistProgressIndex() {
- // Iterate through all the task metas and persist their progress index
- for (final PipeTaskMeta taskMeta : consensusGroupId2TaskMetaMap.values()) {
- if (taskMeta.getProgressIndex() != null) {
- taskMeta.persistProgressIndex();
- }
- }
- }
-
/**
* We use negative regionId to identify the external pipe source, which is
not a consensus group
* id. Then we can reuse the regionId to schedule the external pipe source
and store the progress
@@ -217,11 +204,9 @@ public class PipeRuntimeMeta {
final int size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
- final int taskIndex = ReadWriteIOUtils.readInt(inputStream);
pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
- taskIndex,
- PipeTaskMeta.deserialize(
- PipeRuntimeMetaVersion.VERSION_1, inputStream, taskIndex,
false));
+ ReadWriteIOUtils.readInt(inputStream),
+ PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_1,
inputStream));
}
return pipeRuntimeMeta;
@@ -234,11 +219,9 @@ public class PipeRuntimeMeta {
int size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
- final int taskIndex = ReadWriteIOUtils.readInt(inputStream);
pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
- taskIndex,
- PipeTaskMeta.deserialize(
- PipeRuntimeMetaVersion.VERSION_2, inputStream, taskIndex,
false));
+ ReadWriteIOUtils.readInt(inputStream),
+ PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_2,
inputStream));
}
size = ReadWriteIOUtils.readInt(inputStream);
@@ -255,19 +238,14 @@ public class PipeRuntimeMeta {
}
public static PipeRuntimeMeta deserialize(ByteBuffer byteBuffer) {
- return deserialize(byteBuffer, false);
- }
-
- public static PipeRuntimeMeta deserialize(
- final ByteBuffer byteBuffer, final boolean needPersist) {
final byte pipeRuntimeVersionByte = ReadWriteIOUtils.readByte(byteBuffer);
final PipeRuntimeMetaVersion pipeRuntimeMetaVersion =
PipeRuntimeMetaVersion.deserialize(pipeRuntimeVersionByte);
switch (pipeRuntimeMetaVersion) {
case VERSION_1:
- return deserializeVersion1(byteBuffer, pipeRuntimeVersionByte,
needPersist);
+ return deserializeVersion1(byteBuffer, pipeRuntimeVersionByte);
case VERSION_2:
- return deserializeVersion2(byteBuffer, needPersist);
+ return deserializeVersion2(byteBuffer);
default:
throw new UnsupportedOperationException(
"Unknown pipe runtime meta version: " +
pipeRuntimeMetaVersion.getVersion());
@@ -275,36 +253,31 @@ public class PipeRuntimeMeta {
}
private static PipeRuntimeMeta deserializeVersion1(
- ByteBuffer byteBuffer, byte pipeRuntimeVersionByte, final boolean
needPersist) {
+ ByteBuffer byteBuffer, byte pipeRuntimeVersionByte) {
final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(pipeRuntimeVersionByte));
final int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
- final int taskIndex = ReadWriteIOUtils.readInt(byteBuffer);
pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
- taskIndex,
- PipeTaskMeta.deserialize(
- PipeRuntimeMetaVersion.VERSION_1, byteBuffer, taskIndex,
needPersist));
+ ReadWriteIOUtils.readInt(byteBuffer),
+ PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_1,
byteBuffer));
}
return pipeRuntimeMeta;
}
- public static PipeRuntimeMeta deserializeVersion2(
- ByteBuffer byteBuffer, final boolean needPersist) {
+ public static PipeRuntimeMeta deserializeVersion2(ByteBuffer byteBuffer) {
final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(byteBuffer)));
int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
- final int taskIndex = ReadWriteIOUtils.readInt(byteBuffer);
pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
- taskIndex,
- PipeTaskMeta.deserialize(
- PipeRuntimeMetaVersion.VERSION_2, byteBuffer, taskIndex,
needPersist));
+ ReadWriteIOUtils.readInt(byteBuffer),
+ PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_2,
byteBuffer));
}
size = ReadWriteIOUtils.readInt(byteBuffer);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
index 4a753c0e5bf..627ae1fbf9a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.commons.pipe.agent.task.meta;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
@@ -27,51 +26,26 @@ import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeExceptionType;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
-import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.commons.io.FileUtils;
-import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class PipeTaskMeta {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTaskMeta.class);
- private static final String PREFIX = "__progressIndex_";
-
private final AtomicReference<ProgressIndex> progressIndex = new
AtomicReference<>();
private final AtomicInteger leaderNodeId = new AtomicInteger(0);
- private final AtomicLong updateCount = new AtomicLong(0);
- private final AtomicLong lastPersistCount = new AtomicLong(0);
- private final long checkPointGap =
- PipeConfig.getInstance().getPipeProgressIndexPersistCheckPointGap();
- private File progressIndexPersistFile;
- private final AtomicBoolean isRegisterPersistTask = new AtomicBoolean(false);
- private Future<?> persistProgressIndexFuture;
-
/**
* Stores the exceptions encountered during run time of each pipe task.
*
@@ -84,26 +58,9 @@ public class PipeTaskMeta {
private final Set<PipeRuntimeException> exceptionMessages =
Collections.newSetFromMap(new ConcurrentHashMap<>());
- public PipeTaskMeta(
- /* @NotNull */ final ProgressIndex progressIndex,
- final int leaderNodeId,
- final int taskIndex,
- final boolean needPersistProgressIndex) {
+ public PipeTaskMeta(/* @NotNull */ final ProgressIndex progressIndex, final
int leaderNodeId) {
this.progressIndex.set(progressIndex);
this.leaderNodeId.set(leaderNodeId);
- // PipeTaskMeta created in configNode doesn't need to persist progress
index.
- if (needPersistProgressIndex) {
- this.progressIndexPersistFile =
- new File(
- IoTDBConstant.DN_DEFAULT_DATA_DIR
- + File.separator
- + IoTDBConstant.SYSTEM_FOLDER_NAME
- + File.separator
- + PipeConfig.getInstance().getPipeHardlinkBaseDirName()
- + File.separator
- +
PipeConfig.getInstance().getPipeProgressIndexPersistDirName(),
- PREFIX + taskIndex);
- }
}
public ProgressIndex getProgressIndex() {
@@ -111,86 +68,8 @@ public class PipeTaskMeta {
}
public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
- // only pipeTaskMeta that need to updateProgressIndex will persist
progress index
- // isRegisterPersistTask is used to avoid multiple threads registering
persist task concurrently
- if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
- && this.persistProgressIndexFuture == null
- && !isRegisterPersistTask.getAndSet(true)) {
- this.persistProgressIndexFuture =
- PipePeriodicalJobExecutor.submitBackgroundJob(
- this::persistProgressIndex,
- 0,
- PipeConfig.getInstance().getPipeProgressIndexFlushIntervalMs());
- }
-
- progressIndex.updateAndGet(
+ return progressIndex.updateAndGet(
index ->
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
-
- if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
- && updateCount.incrementAndGet() - lastPersistCount.get() >
checkPointGap) {
- persistProgressIndex();
- }
-
- return progressIndex.get();
- }
-
- public synchronized void persistProgressIndex() {
- if (Objects.isNull(progressIndexPersistFile)
- // in case of multiple threads calling updateProgressIndex at the same
time
- || lastPersistCount.get() == updateCount.get()) {
- return;
- }
-
- try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
- final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
- progressIndex.get().serialize(outputStream);
- // append is false by default.
- FileUtils.writeByteArrayToFile(
- progressIndexPersistFile,
- byteArrayOutputStream.getBuf(),
- 0,
- byteArrayOutputStream.size());
- lastPersistCount.set(updateCount.get());
- } catch (IOException e) {
- LOGGER.warn("Failed to persist progress index {} for {}",
progressIndex.get(), this, e);
- }
- }
-
- public ProgressIndex restoreProgressIndex() {
- if (!progressIndexPersistFile.exists() ||
progressIndexPersistFile.length() == 0) {
- return progressIndex.get();
- }
-
- try {
- final byte[] fileData =
Files.readAllBytes(progressIndexPersistFile.toPath());
-
- try (final ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(fileData);
- final DataInputStream inputStream = new
DataInputStream(byteArrayInputStream)) {
- final ProgressIndex restoredIndex =
ProgressIndexType.deserializeFrom(inputStream);
-
progressIndex.get().updateToMinimumEqualOrIsAfterProgressIndex(restoredIndex);
- LOGGER.info(
- "{} successfully restored progress index from [{}], current index:
{}",
- this,
- progressIndexPersistFile.getAbsolutePath(),
- progressIndex.get());
- }
- } catch (final IOException e) {
- LOGGER.warn(
- "{} failed to restore progress index from [{}].",
- this,
- progressIndexPersistFile.getAbsolutePath(),
- e);
- }
- return progressIndex.get();
- }
-
- public void cancelPersistProgressIndexFuture() {
- if (Objects.nonNull(progressIndexPersistFile)
- && isRegisterPersistTask.getAndSet(false)
- && persistProgressIndexFuture != null) {
- persistProgressIndexFuture.cancel(false);
- persistProgressIndexFuture = null;
- }
}
public int getLeaderNodeId() {
@@ -242,16 +121,12 @@ public class PipeTaskMeta {
}
public static PipeTaskMeta deserialize(
- final PipeRuntimeMetaVersion version,
- final ByteBuffer byteBuffer,
- final int taskIndex,
- final boolean needPersist) {
+ final PipeRuntimeMetaVersion version, final ByteBuffer byteBuffer) {
final ProgressIndex progressIndex =
ProgressIndexType.deserializeFrom(byteBuffer);
final int leaderNodeId = ReadWriteIOUtils.readInt(byteBuffer);
- final PipeTaskMeta pipeTaskMeta =
- new PipeTaskMeta(progressIndex, leaderNodeId, taskIndex, needPersist);
+ final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(progressIndex,
leaderNodeId);
final int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
final PipeRuntimeException pipeRuntimeException =
@@ -262,17 +137,12 @@ public class PipeTaskMeta {
}
public static PipeTaskMeta deserialize(
- final PipeRuntimeMetaVersion version,
- final InputStream inputStream,
- final int taskIndex,
- final boolean needPersist)
- throws IOException {
+ final PipeRuntimeMetaVersion version, final InputStream inputStream)
throws IOException {
final ProgressIndex progressIndex =
ProgressIndexType.deserializeFrom(inputStream);
final int leaderNodeId = ReadWriteIOUtils.readInt(inputStream);
- final PipeTaskMeta pipeTaskMeta =
- new PipeTaskMeta(progressIndex, leaderNodeId, taskIndex, needPersist);
+ final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(progressIndex,
leaderNodeId);
final int size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
final PipeRuntimeException pipeRuntimeException =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index af239c22861..537bb6fbb7c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -46,22 +46,6 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeHardlinkTsFileDirName();
}
- public String getPipeProgressIndexPersistDirName() {
- return COMMON_CONFIG.getPipeProgressIndexPersistDirName();
- }
-
- public boolean isPipeProgressIndexPersistEnabled() {
- return COMMON_CONFIG.isPipeProgressIndexPersistEnabled();
- }
-
- public long getPipeProgressIndexPersistCheckPointGap() {
- return COMMON_CONFIG.getPipeProgressIndexPersistCheckPointGap();
- }
-
- public long getPipeProgressIndexFlushIntervalMs() {
- return COMMON_CONFIG.getPipeProgressIndexFlushIntervalMs();
- }
-
public String getPipeHardlinkWALDirName() {
return COMMON_CONFIG.getPipeHardlinkWALDirName();
}
@@ -486,11 +470,6 @@ public class PipeConfig {
LOGGER.info("PipeHardlinkBaseDirName: {}", getPipeHardlinkBaseDirName());
LOGGER.info("PipeHardlinkTsFileDirName: {}",
getPipeHardlinkTsFileDirName());
- LOGGER.info("PipeProgressIndexPersistDirName: {}",
getPipeProgressIndexPersistDirName());
- LOGGER.info("PipeProgressIndexPersistEnabled: {}",
isPipeProgressIndexPersistEnabled());
- LOGGER.info(
- "PipeProgressIndexPersistCheckPointGap: {}",
getPipeProgressIndexPersistCheckPointGap());
- LOGGER.info("PipeProgressIndexFlushIntervalMs: {}",
getPipeProgressIndexFlushIntervalMs());
LOGGER.info("PipeHardlinkWALDirName: {}", getPipeHardlinkWALDirName());
LOGGER.info("PipeHardLinkWALEnabled: {}", getPipeHardLinkWALEnabled());
LOGGER.info("PipeFileReceiverFsyncEnabled: {}",
getPipeFileReceiverFsyncEnabled());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 8bfe59f30bb..5f81ed15270 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -46,9 +46,6 @@ public class PipeDescriptor {
config.setPipeHardlinkTsFileDirName(
properties.getProperty(
"pipe_hardlink_tsfile_dir_name",
config.getPipeHardlinkTsFileDirName()));
- config.setPipeProgressIndexPersistDirName(
- properties.getProperty(
- "pipe_progress_index_persist_dir_name",
config.getPipeProgressIndexPersistDirName()));
config.setPipeHardlinkWALDirName(
properties.getProperty("pipe_hardlink_wal_dir_name",
config.getPipeHardlinkWALDirName()));
config.setPipeHardLinkWALEnabled(
@@ -100,21 +97,6 @@ public class PipeDescriptor {
Boolean.parseBoolean(
properties.getProperty(
"pipe_auto_restart_enabled",
String.valueOf(config.getPipeAutoRestartEnabled()))));
- config.setPipeProgressIndexPersistEnabled(
- Boolean.parseBoolean(
- properties.getProperty(
- "pipe_progress_index_persist_enabled",
- String.valueOf(config.isPipeProgressIndexPersistEnabled()))));
- config.setPipeProgressIndexPersistCheckPointGap(
- Long.parseLong(
- properties.getProperty(
- "pipe_progress_index_persist_check_point_gap",
-
String.valueOf(config.getPipeProgressIndexPersistCheckPointGap()))));
- config.setPipeProgressIndexFlushIntervalMs(
- Long.parseLong(
- properties.getProperty(
- "pipe_progress_index_flush_interval_ms",
-
String.valueOf(config.getPipeProgressIndexFlushIntervalMs()))));
config.setPipeAirGapReceiverEnabled(
Boolean.parseBoolean(
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
index d4e6008547c..8e04baf7cc0 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
@@ -97,27 +97,20 @@ public class PipeMetaDeSerTest {
new PipeRuntimeMeta(
new ConcurrentHashMap<Integer, PipeTaskMeta>() {
{
- put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 987,
123, false));
- put(234, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 789,
234, false));
- put(345, new PipeTaskMeta(new SimpleProgressIndex(3, 4), 789,
345, false));
- put(456, new PipeTaskMeta(finalHybridProgressIndex, 789, 456,
false));
+ put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 987));
+ put(234, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 789));
+ put(345, new PipeTaskMeta(new SimpleProgressIndex(3, 4), 789));
+ put(456, new PipeTaskMeta(finalHybridProgressIndex, 789));
put(
567,
new PipeTaskMeta(
- new RecoverProgressIndex(1, new SimpleProgressIndex(1,
9)),
- 123,
- 567,
- false));
+ new RecoverProgressIndex(1, new SimpleProgressIndex(1,
9)), 123));
put(
678,
new PipeTaskMeta(
new
TimeWindowStateProgressIndex(timeSeries2TimestampWindowBufferPairMap),
- 789,
- 678,
- false));
- put(
- Integer.MIN_VALUE,
- new PipeTaskMeta(new MetaProgressIndex(987), 0,
Integer.MIN_VALUE, false));
+ 789));
+ put(Integer.MIN_VALUE, new PipeTaskMeta(new
MetaProgressIndex(987), 0));
}
});
ByteBuffer runtimeByteBuffer = pipeRuntimeMeta.serialize();
@@ -136,7 +129,6 @@ public class PipeMetaDeSerTest {
Assert.assertEquals(pipeRuntimeMeta, pipeRuntimeMeta1);
pipeRuntimeMeta.getStatus().set(PipeStatus.DROPPED);
- pipeRuntimeMeta.onSetPipeDroppedOrStopped();
pipeRuntimeMeta.setIsStoppedByRuntimeException(true);
pipeRuntimeMeta.setExceptionsClearTime(0);
pipeRuntimeMeta