This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new ae14450ad49 [To dev/1.3] Revert Pipe: Try to persist progressIndex in
local for resend event more efficently (#15599) (#15926)
ae14450ad49 is described below
commit ae14450ad498da3b21c82d083ed82b53c448901a
Author: Caideyipi <[email protected]>
AuthorDate: Sat Jul 12 17:08:59 2025 +0800
[To dev/1.3] Revert Pipe: Try to persist progressIndex in local for resend
event more efficently (#15599) (#15926)
* Revert "[To dev/1.3] Pipe/IoTV2: Try to persist progressIndex in local
for resend event more efficently (#15599) (#15669)"
This reverts commit 59974f42a832f2e9124a7f6ecab652e463272b3d.
* Update DataNodeShutdownHook.java
* continue revert
* fix
---
.../runtime/heartbeat/PipeHeartbeatParser.java | 2 -
.../confignode/persistence/pipe/PipeTaskInfo.java | 7 +-
.../impl/pipe/task/AlterPipeProcedureV2.java | 10 +-
.../impl/pipe/task/CreatePipeProcedureV2.java | 14 +-
.../confignode/service/ConfigNodeShutdownHook.java | 3 -
.../request/ConfigPhysicalPlanSerDeTest.java | 18 +--
.../consensus/response/pipe/PipeTableRespTest.java | 6 +-
.../agent/PipeConfigNodeSubtaskExecutorTest.java | 3 +-
.../iotdb/confignode/persistence/PipeInfoTest.java | 4 +-
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 27 ----
.../PipeHistoricalDataRegionTsFileExtractor.java | 5 +-
.../iotdb/db/service/DataNodeShutdownHook.java | 3 -
.../iotdb/commons/concurrent/ThreadName.java | 2 -
.../apache/iotdb/commons/conf/CommonConfig.java | 55 --------
.../agent/runtime/PipePeriodicalJobExecutor.java | 33 -----
.../commons/pipe/agent/task/PipeTaskAgent.java | 3 -
.../commons/pipe/agent/task/meta/PipeMeta.java | 6 +-
.../pipe/agent/task/meta/PipeRuntimeMeta.java | 51 ++------
.../commons/pipe/agent/task/meta/PipeTaskMeta.java | 142 +--------------------
.../iotdb/commons/pipe/config/PipeConfig.java | 21 ---
.../iotdb/commons/pipe/config/PipeDescriptor.java | 18 ---
.../iotdb/commons/pipe/task/PipeMetaDeSerTest.java | 22 +---
22 files changed, 48 insertions(+), 407 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 6f6c0b2d443..e2303fecdea 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -244,7 +244,6 @@ public class PipeHeartbeatParser {
.equals(PipeStatus.STOPPED)) {
PipeRuntimeMeta runtimeMeta =
pipeMetaFromCoordinator.getRuntimeMeta();
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
- runtimeMeta.onSetPipeDroppedOrStopped();
runtimeMeta.setIsStoppedByRuntimeException(true);
needWriteConsensusOnConfigNodes.set(true);
@@ -274,7 +273,6 @@ public class PipeHeartbeatParser {
exceptionMap.put(nodeId, exception);
}
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
- runtimeMeta.onSetPipeDroppedOrStopped();
runtimeMeta.setIsStoppedByRuntimeException(true);
needWriteConsensusOnConfigNodes.set(true);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 880271f8d01..372209bfd74 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -616,11 +616,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
if (newLeader != -1) {
consensusGroupIdToTaskMetaMap.put(
consensusGroupId.getId(),
- new PipeTaskMeta(
- MinimumProgressIndex.INSTANCE,
- newLeader,
- consensusGroupId.getId(),
- false));
+ new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader));
}
// else:
// "The pipe task meta does not contain the data
region group {} or
@@ -794,7 +790,6 @@ public class PipeTaskInfo implements SnapshotProcessor {
// Mark the status of the pipe with exception as stopped
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
- runtimeMeta.onSetPipeDroppedOrStopped();
runtimeMeta.setIsStoppedByRuntimeException(true);
final Map<Integer, PipeRuntimeException> exceptionMap =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index b17795afa43..b11c74408a6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -155,11 +155,7 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
// Pipe only collect user's data, filter metric database here.
updatedConsensusGroupIdToTaskMetaMap.put(
regionGroupId.getId(),
- new PipeTaskMeta(
- currentPipeTaskMeta.getProgressIndex(),
- regionLeaderNodeId,
- regionGroupId.getId(),
- false));
+ new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(),
regionLeaderNodeId));
}
});
@@ -174,9 +170,7 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
new PipeTaskMeta(
configRegionTaskMeta.getProgressIndex(),
// The leader of the config region is the config node itself
- ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
- Integer.MIN_VALUE,
- false));
+ ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
}
updatedPipeRuntimeMeta = new
PipeRuntimeMeta(updatedConsensusGroupIdToTaskMetaMap);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 3b91aa07802..813d4ebe69e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -171,9 +171,7 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
groupId.getId(),
new PipeTaskMeta(
new RecoverProgressIndex(senderDataNodeId, new
SimpleProgressIndex(0, 0)),
- senderDataNodeId,
- groupId.getId(),
- false));
+ senderDataNodeId));
} else {
// data regions & schema regions
env.getConfigManager()
@@ -189,11 +187,7 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
// Pipe only collect user's data, filter out metric database
here.
consensusGroupIdToTaskMetaMap.put(
regionGroupId.getId(),
- new PipeTaskMeta(
- MinimumProgressIndex.INSTANCE,
- regionLeaderNodeId,
- regionGroupId.getId(),
- false));
+ new PipeTaskMeta(MinimumProgressIndex.INSTANCE,
regionLeaderNodeId));
}
});
@@ -206,9 +200,7 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
new PipeTaskMeta(
MinimumProgressIndex.INSTANCE,
// The leader of the config region is the config node itself
- ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
- Integer.MIN_VALUE,
- false));
+ ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
}
pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
index bd12adbd804..5c3ec5af063 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
@@ -88,8 +87,6 @@ public class ConfigNodeShutdownHook extends Thread {
"Reporting ConfigNode shutdown failed. The cluster will still take
the current ConfigNode as Running for a few seconds.");
}
}
- // Shutdown pipe progressIndex background service
- PipePeriodicalJobExecutor.shutdownBackgroundService();
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index c09dafb1d29..be2ba4b6960 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -849,7 +849,7 @@ public class ConfigPhysicalPlanSerDeTest {
extractorAttributes.put("extractor",
"org.apache.iotdb.pipe.extractor.DefaultExtractor");
processorAttributes.put("processor",
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
connectorAttributes.put("connector",
"org.apache.iotdb.pipe.protocol.ThriftTransporter");
- final PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+ PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
pipeTasks.put(1, pipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
@@ -874,8 +874,8 @@ public class ConfigPhysicalPlanSerDeTest {
extractorAttributes.put("pattern", "root.db");
processorAttributes.put("processor", "do-nothing-processor");
connectorAttributes.put("batch.enable", "false");
- final PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
- final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new
ConcurrentHashMap<>();
+ PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
+ ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
pipeTasks.put(1, pipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
new PipeStaticMeta(
@@ -912,8 +912,8 @@ public class ConfigPhysicalPlanSerDeTest {
@Test
public void OperateMultiplePipesPlanV2Test() throws IOException {
- final PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
- final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new
ConcurrentHashMap<>();
+ PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
+ ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
pipeTasks.put(1, pipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
new PipeStaticMeta(
@@ -925,8 +925,8 @@ public class ConfigPhysicalPlanSerDeTest {
PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
CreatePipePlanV2 createPipePlanV2 = new CreatePipePlanV2(pipeStaticMeta,
pipeRuntimeMeta);
- final PipeTaskMeta pipeTaskMeta1 = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2, 2, false);
- final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks1 = new
ConcurrentHashMap<>();
+ PipeTaskMeta pipeTaskMeta1 = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2);
+ ConcurrentMap<Integer, PipeTaskMeta> pipeTasks1 = new
ConcurrentHashMap<>();
pipeTasks.put(2, pipeTaskMeta1);
PipeStaticMeta pipeStaticMeta1 =
new PipeStaticMeta(
@@ -1024,8 +1024,8 @@ public class ConfigPhysicalPlanSerDeTest {
new PipeRuntimeMeta(
new ConcurrentHashMap<Integer, PipeTaskMeta>() {
{
- put(456, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 987, 1,
false));
- put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 789,
1, false));
+ put(456, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 987));
+ put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 789));
}
});
pipeMetaList.add(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta));
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
index 04dea675015..94189a19d99 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
@@ -54,7 +54,7 @@ public class PipeTableRespTest {
connectorAttributes.put("host", "127.0.0.1");
connectorAttributes.put("port", "6667");
- PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+ PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
pipeTasks.put(1, pipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
@@ -74,7 +74,7 @@ public class PipeTableRespTest {
connectorAttributes1.put("host", "127.0.0.1");
connectorAttributes1.put("port", "6667");
- PipeTaskMeta pipeTaskMeta1 = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+ PipeTaskMeta pipeTaskMeta1 = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks1 = new
ConcurrentHashMap<>();
pipeTasks1.put(1, pipeTaskMeta1);
PipeStaticMeta pipeStaticMeta1 =
@@ -94,7 +94,7 @@ public class PipeTableRespTest {
connectorAttributes2.put("host", "172.30.30.30");
connectorAttributes2.put("port", "6667");
- PipeTaskMeta pipeTaskMeta2 = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+ PipeTaskMeta pipeTaskMeta2 = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks2 = new
ConcurrentHashMap<>();
pipeTasks2.put(1, pipeTaskMeta2);
PipeStaticMeta pipeStaticMeta2 =
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
index a43a87b120e..f2fa5b0205a 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
@@ -62,8 +62,7 @@ public class PipeConfigNodeSubtaskExecutorTest {
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName());
}
},
- new PipeTaskMeta(
- MinimumProgressIndex.INSTANCE, Integer.MIN_VALUE,
Integer.MIN_VALUE, false)));
+ new PipeTaskMeta(MinimumProgressIndex.INSTANCE,
Integer.MIN_VALUE)));
}
@After
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index 815d5c1757e..c3e7916108f 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -86,7 +86,7 @@ public class PipeInfoTest {
connectorAttributes.put("host", "127.0.0.1");
connectorAttributes.put("port", "6667");
- PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+ PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
pipeTasks.put(1, pipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
@@ -121,7 +121,7 @@ public class PipeInfoTest {
extractorAttributes.put("extractor",
"org.apache.iotdb.pipe.extractor.DefaultExtractor");
processorAttributes.put("processor",
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
connectorAttributes.put("connector",
"org.apache.iotdb.pipe.protocol.ThriftTransporter");
- PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+ PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
pipeTasks.put(1, pipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 5d21e2970de..b840b9274aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -825,11 +825,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
///////////////////////// Shutdown Logic /////////////////////////
public void persistAllProgressIndex() {
- persistAllProgressIndexLocally();
- persistAllProgressIndex2ConfigNode();
- }
-
- private void persistAllProgressIndex2ConfigNode() {
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
// Send request to some API server
@@ -848,28 +843,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
}
- private void persistAllProgressIndexLocally() {
- if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
- LOGGER.info(
- "Pipe progress index persist disabled. Skipping persist all progress
index locally.");
- return;
- }
- if (!tryReadLockWithTimeOut(10)) {
- LOGGER.info("Failed to persist all progress index locally because of
timeout.");
- return;
- }
- try {
- for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
- pipeMeta.getRuntimeMeta().persistProgressIndex();
- }
- LOGGER.info("Persist all progress index locally successfully.");
- } catch (final Exception e) {
- LOGGER.warn("Failed to record all progress index locally, because {}.",
e.getMessage(), e);
- } finally {
- releaseReadLock();
- }
- }
-
///////////////////////// Pipe Consensus /////////////////////////
public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final
int consensusGroupId) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 3ea98cf5bb3..5ad3c6039f6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -267,10 +267,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
pipeName = environment.getPipeName();
creationTime = environment.getCreationTime();
pipeTaskMeta = environment.getPipeTaskMeta();
-
- // progressIndex is immutable in
`updateToMinimumEqualOrIsAfterProgressIndex`, so data
- // consistency in `environment.getPipeTaskMeta().getProgressIndex()` is
ensured.
- startIndex = environment.getPipeTaskMeta().restoreProgressIndex();
+ startIndex = environment.getPipeTaskMeta().getProgressIndex();
dataRegionId = environment.getRegionId();
synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 8dd3d39c92c..5b62a0d614c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -24,7 +24,6 @@ import
org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.exception.ConsensusException;
@@ -123,8 +122,6 @@ public class DataNodeShutdownHook extends Thread {
}
// Persist progress index before shutdown to accurate recovery after
restart
PipeDataNodeAgent.task().persistAllProgressIndex();
- // Shutdown pipe progressIndex background service
- PipePeriodicalJobExecutor.shutdownBackgroundService();
// Actually stop all services started by the DataNode.
// If we don't call this, services like the RestService are not stopped
and I can't re-start
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 19aff15004d..cf23e22b4ce 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -142,7 +142,6 @@ public enum ThreadName {
PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"),
-
PIPE_PROGRESS_INDEX_BACKGROUND_SERVICE("Pipe-Progress-Index-Background-Service"),
LOAD_DATATYPE_CONVERT_POOL("Load-Datatype-Convert-Pool"),
SUBSCRIPTION_EXECUTOR_POOL("Subscription-Executor-Pool"),
SUBSCRIPTION_RUNTIME_META_SYNCER("Subscription-Runtime-Meta-Syncer"),
@@ -298,7 +297,6 @@ public enum ThreadName {
PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
PIPE_RECEIVER_AIR_GAP_AGENT,
PIPE_AIR_GAP_RECEIVER,
- PIPE_PROGRESS_INDEX_BACKGROUND_SERVICE,
SUBSCRIPTION_EXECUTOR_POOL,
SUBSCRIPTION_RUNTIME_META_SYNCER,
WINDOW_EVALUATION_SERVICE,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index f7adde82e56..bf1ef7d25a8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -199,8 +199,6 @@ public class CommonConfig {
private String pipeHardlinkTsFileDirName = "tsfile";
- private String pipeProgressIndexPersistDirName = "progress";
-
private String pipeHardlinkWALDirName = "wal";
private boolean pipeHardLinkWALEnabled = false;
@@ -268,9 +266,6 @@ public class CommonConfig {
private long pipeMetaSyncerSyncIntervalMinutes = 3;
private long pipeMetaSyncerAutoRestartPipeCheckIntervalRound = 1;
private boolean pipeAutoRestartEnabled = true;
- private boolean pipeProgressIndexPersistEnabled = true;
- private long pipeProgressIndexPersistCheckPointGap = 20;
- private long pipeProgressIndexFlushIntervalMs = 20 * 1000L;
private boolean pipeAirGapReceiverEnabled = false;
private int pipeAirGapReceiverPort = 9780;
@@ -768,18 +763,6 @@ public class CommonConfig {
logger.info("pipeHardlinkTsFileDirName is set to {}.", pipeTsFileDirName);
}
- public String getPipeProgressIndexPersistDirName() {
- return pipeProgressIndexPersistDirName;
- }
-
- public void setPipeProgressIndexPersistDirName(String
pipeProgressIndexPersistDirName) {
- if (Objects.equals(this.pipeProgressIndexPersistDirName,
pipeProgressIndexPersistDirName)) {
- return;
- }
- this.pipeProgressIndexPersistDirName = pipeProgressIndexPersistDirName;
- logger.info("pipeProgressIndexPersistDir is set to {}.",
pipeProgressIndexPersistDirName);
- }
-
public String getPipeHardlinkWALDirName() {
return pipeHardlinkWALDirName;
}
@@ -1238,44 +1221,6 @@ public class CommonConfig {
logger.info("pipeAutoRestartEnabled is set to {}.",
pipeAutoRestartEnabled);
}
- public boolean isPipeProgressIndexPersistEnabled() {
- return pipeProgressIndexPersistEnabled;
- }
-
- public void setPipeProgressIndexPersistEnabled(boolean
pipeProgressIndexPersistEnabled) {
- if (this.pipeProgressIndexPersistEnabled ==
pipeProgressIndexPersistEnabled) {
- return;
- }
- this.pipeProgressIndexPersistEnabled = pipeProgressIndexPersistEnabled;
- logger.info("pipeProgressIndexPersistEnabled is set to {}.",
pipeProgressIndexPersistEnabled);
- }
-
- public long getPipeProgressIndexPersistCheckPointGap() {
- return pipeProgressIndexPersistCheckPointGap;
- }
-
- public void setPipeProgressIndexPersistCheckPointGap(long
pipeProgressIndexPersistCheckPointGap) {
- if (this.pipeProgressIndexPersistCheckPointGap ==
pipeProgressIndexPersistCheckPointGap) {
- return;
- }
- this.pipeProgressIndexPersistCheckPointGap =
pipeProgressIndexPersistCheckPointGap;
- logger.info(
- "pipeProgressIndexPersistCheckPointGap is set to {}.",
- pipeProgressIndexPersistCheckPointGap);
- }
-
- public long getPipeProgressIndexFlushIntervalMs() {
- return pipeProgressIndexFlushIntervalMs;
- }
-
- public void setPipeProgressIndexFlushIntervalMs(long
pipeProgressIndexFlushIntervalMs) {
- if (this.pipeProgressIndexFlushIntervalMs ==
pipeProgressIndexFlushIntervalMs) {
- return;
- }
- this.pipeProgressIndexFlushIntervalMs = pipeProgressIndexFlushIntervalMs;
- logger.info("pipeProgressIndexFlushIntervalMs is set to {}.",
pipeProgressIndexFlushIntervalMs);
- }
-
public long getPipeConnectorRetryIntervalMs() {
return pipeConnectorRetryIntervalMs;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
index 33ac03c5c96..3226b3947f0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
@@ -21,16 +21,8 @@ package org.apache.iotdb.commons.pipe.agent.runtime;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
/**
* The shortest scheduling cycle for these jobs is {@link
* PipeConfig#getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()},
suitable for jobs that are
@@ -38,31 +30,6 @@ import java.util.concurrent.TimeUnit;
*/
public class PipePeriodicalJobExecutor extends
AbstractPipePeriodicalJobExecutor {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipePeriodicalJobExecutor.class);
- // This background service is used to execute jobs that need to be cancelled
and released.
- private static final ScheduledExecutorService backgroundService =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
- ThreadName.PIPE_PROGRESS_INDEX_BACKGROUND_SERVICE.getName());
-
- public static Future<?> submitBackgroundJob(
- Runnable job, long initialDelayInMs, long periodInMs) {
- return ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- backgroundService, job, initialDelayInMs, periodInMs,
TimeUnit.MILLISECONDS);
- }
-
- public static void shutdownBackgroundService() {
- backgroundService.shutdownNow();
- try {
- if (!backgroundService.awaitTermination(30, TimeUnit.SECONDS)) {
- LOGGER.warn("Pipe progressIndex background service did not terminate
within {}s", 30);
- }
- } catch (InterruptedException e) {
- LOGGER.warn(
- "Pipe progressIndex background service is interrupted while waiting
for termination");
- Thread.currentThread().interrupt();
- }
- }
-
public PipePeriodicalJobExecutor() {
super(
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index a88232161c8..9818b43acea 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -520,7 +520,6 @@ public abstract class PipeTaskAgent {
// but the pipe task meta has not been cleaned up (in case of failure when
executing
// dropPipeTaskByConsensusGroup).
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
- existedPipeMeta.getRuntimeMeta().onSetPipeDroppedOrStopped();
// Drop pipe tasks
final Map<Integer, PipeTask> pipeTasks =
@@ -562,7 +561,6 @@ public abstract class PipeTaskAgent {
// but the pipe task meta has not been cleaned up (in case of failure when
executing
// dropPipeTaskByConsensusGroup).
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
- existedPipeMeta.getRuntimeMeta().onSetPipeDroppedOrStopped();
// Drop pipe tasks
final Map<Integer, PipeTask> pipeTasks =
@@ -655,7 +653,6 @@ public abstract class PipeTaskAgent {
// Set pipe meta status to STOPPED
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
- existedPipeMeta.getRuntimeMeta().onSetPipeDroppedOrStopped();
}
////////////////////////// Checker //////////////////////////
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
index c71156a234b..997278010e9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.commons.pipe.agent.task.meta;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-
import org.apache.tsfile.utils.PublicBAOS;
import java.io.DataOutputStream;
@@ -83,9 +81,7 @@ public class PipeMeta {
public static PipeMeta deserialize4TaskAgent(final ByteBuffer byteBuffer) {
final PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(byteBuffer);
- final PipeRuntimeMeta runtimeMeta =
- PipeRuntimeMeta.deserialize(
- byteBuffer,
PipeConfig.getInstance().isPipeProgressIndexPersistEnabled());
+ final PipeRuntimeMeta runtimeMeta =
PipeRuntimeMeta.deserialize(byteBuffer);
return new PipeMeta(
staticMeta,
runtimeMeta,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
index 402a601e52b..5223b6a9e8a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
@@ -110,10 +110,6 @@ public class PipeRuntimeMeta {
return status;
}
- public void onSetPipeDroppedOrStopped() {
-
consensusGroupId2TaskMetaMap.values().forEach(PipeTaskMeta::cancelPersistProgressIndexFuture);
- }
-
public ConcurrentMap<Integer, PipeTaskMeta>
getConsensusGroupId2TaskMetaMap() {
return consensusGroupId2TaskMetaMap;
}
@@ -140,15 +136,6 @@ public class PipeRuntimeMeta {
this.isStoppedByRuntimeException.set(isStoppedByRuntimeException);
}
- public void persistProgressIndex() {
- // Iterate through all the task metas and persist their progress index
- for (final PipeTaskMeta taskMeta : consensusGroupId2TaskMetaMap.values()) {
- if (taskMeta.getProgressIndex() != null) {
- taskMeta.persistProgressIndex();
- }
- }
- }
-
public ByteBuffer serialize() throws IOException {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
@@ -207,11 +194,9 @@ public class PipeRuntimeMeta {
final int size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
- final int taskIndex = ReadWriteIOUtils.readInt(inputStream);
pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
- taskIndex,
- PipeTaskMeta.deserialize(
- PipeRuntimeMetaVersion.VERSION_1, inputStream, taskIndex,
false));
+ ReadWriteIOUtils.readInt(inputStream),
+ PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_1,
inputStream));
}
return pipeRuntimeMeta;
@@ -224,11 +209,9 @@ public class PipeRuntimeMeta {
int size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
- final int taskIndex = ReadWriteIOUtils.readInt(inputStream);
pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
- taskIndex,
- PipeTaskMeta.deserialize(
- PipeRuntimeMetaVersion.VERSION_2, inputStream, taskIndex,
false));
+ ReadWriteIOUtils.readInt(inputStream),
+ PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_2,
inputStream));
}
size = ReadWriteIOUtils.readInt(inputStream);
@@ -245,19 +228,14 @@ public class PipeRuntimeMeta {
}
public static PipeRuntimeMeta deserialize(ByteBuffer byteBuffer) {
- return deserialize(byteBuffer, false);
- }
-
- public static PipeRuntimeMeta deserialize(
- final ByteBuffer byteBuffer, final boolean needPersist) {
final byte pipeRuntimeVersionByte = ReadWriteIOUtils.readByte(byteBuffer);
final PipeRuntimeMetaVersion pipeRuntimeMetaVersion =
PipeRuntimeMetaVersion.deserialize(pipeRuntimeVersionByte);
switch (pipeRuntimeMetaVersion) {
case VERSION_1:
- return deserializeVersion1(byteBuffer, pipeRuntimeVersionByte,
needPersist);
+ return deserializeVersion1(byteBuffer, pipeRuntimeVersionByte);
case VERSION_2:
- return deserializeVersion2(byteBuffer, needPersist);
+ return deserializeVersion2(byteBuffer);
default:
throw new UnsupportedOperationException(
"Unknown pipe runtime meta version: " +
pipeRuntimeMetaVersion.getVersion());
@@ -265,36 +243,31 @@ public class PipeRuntimeMeta {
}
private static PipeRuntimeMeta deserializeVersion1(
- ByteBuffer byteBuffer, byte pipeRuntimeVersionByte, final boolean
needPersist) {
+ ByteBuffer byteBuffer, byte pipeRuntimeVersionByte) {
final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(pipeRuntimeVersionByte));
final int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
- final int taskIndex = ReadWriteIOUtils.readInt(byteBuffer);
pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
- taskIndex,
- PipeTaskMeta.deserialize(
- PipeRuntimeMetaVersion.VERSION_1, byteBuffer, taskIndex,
needPersist));
+ ReadWriteIOUtils.readInt(byteBuffer),
+ PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_1,
byteBuffer));
}
return pipeRuntimeMeta;
}
- public static PipeRuntimeMeta deserializeVersion2(
- ByteBuffer byteBuffer, final boolean needPersist) {
+ public static PipeRuntimeMeta deserializeVersion2(ByteBuffer byteBuffer) {
final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(byteBuffer)));
int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
- final int taskIndex = ReadWriteIOUtils.readInt(byteBuffer);
pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
- taskIndex,
- PipeTaskMeta.deserialize(
- PipeRuntimeMetaVersion.VERSION_2, byteBuffer, taskIndex,
needPersist));
+ ReadWriteIOUtils.readInt(byteBuffer),
+ PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_2,
byteBuffer));
}
size = ReadWriteIOUtils.readInt(byteBuffer);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
index 4a753c0e5bf..627ae1fbf9a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.commons.pipe.agent.task.meta;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
@@ -27,51 +26,26 @@ import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeExceptionType;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
-import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.commons.io.FileUtils;
-import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class PipeTaskMeta {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTaskMeta.class);
- private static final String PREFIX = "__progressIndex_";
-
private final AtomicReference<ProgressIndex> progressIndex = new
AtomicReference<>();
private final AtomicInteger leaderNodeId = new AtomicInteger(0);
- private final AtomicLong updateCount = new AtomicLong(0);
- private final AtomicLong lastPersistCount = new AtomicLong(0);
- private final long checkPointGap =
- PipeConfig.getInstance().getPipeProgressIndexPersistCheckPointGap();
- private File progressIndexPersistFile;
- private final AtomicBoolean isRegisterPersistTask = new AtomicBoolean(false);
- private Future<?> persistProgressIndexFuture;
-
/**
* Stores the exceptions encountered during run time of each pipe task.
*
@@ -84,26 +58,9 @@ public class PipeTaskMeta {
private final Set<PipeRuntimeException> exceptionMessages =
Collections.newSetFromMap(new ConcurrentHashMap<>());
- public PipeTaskMeta(
- /* @NotNull */ final ProgressIndex progressIndex,
- final int leaderNodeId,
- final int taskIndex,
- final boolean needPersistProgressIndex) {
+ public PipeTaskMeta(/* @NotNull */ final ProgressIndex progressIndex, final
int leaderNodeId) {
this.progressIndex.set(progressIndex);
this.leaderNodeId.set(leaderNodeId);
- // PipeTaskMeta created in configNode doesn't need to persist progress
index.
- if (needPersistProgressIndex) {
- this.progressIndexPersistFile =
- new File(
- IoTDBConstant.DN_DEFAULT_DATA_DIR
- + File.separator
- + IoTDBConstant.SYSTEM_FOLDER_NAME
- + File.separator
- + PipeConfig.getInstance().getPipeHardlinkBaseDirName()
- + File.separator
- +
PipeConfig.getInstance().getPipeProgressIndexPersistDirName(),
- PREFIX + taskIndex);
- }
}
public ProgressIndex getProgressIndex() {
@@ -111,86 +68,8 @@ public class PipeTaskMeta {
}
public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
- // only pipeTaskMeta that need to updateProgressIndex will persist
progress index
- // isRegisterPersistTask is used to avoid multiple threads registering
persist task concurrently
- if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
- && this.persistProgressIndexFuture == null
- && !isRegisterPersistTask.getAndSet(true)) {
- this.persistProgressIndexFuture =
- PipePeriodicalJobExecutor.submitBackgroundJob(
- this::persistProgressIndex,
- 0,
- PipeConfig.getInstance().getPipeProgressIndexFlushIntervalMs());
- }
-
- progressIndex.updateAndGet(
+ return progressIndex.updateAndGet(
index ->
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
-
- if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
- && updateCount.incrementAndGet() - lastPersistCount.get() >
checkPointGap) {
- persistProgressIndex();
- }
-
- return progressIndex.get();
- }
-
- public synchronized void persistProgressIndex() {
- if (Objects.isNull(progressIndexPersistFile)
- // in case of multiple threads calling updateProgressIndex at the same
time
- || lastPersistCount.get() == updateCount.get()) {
- return;
- }
-
- try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
- final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
- progressIndex.get().serialize(outputStream);
- // append is false by default.
- FileUtils.writeByteArrayToFile(
- progressIndexPersistFile,
- byteArrayOutputStream.getBuf(),
- 0,
- byteArrayOutputStream.size());
- lastPersistCount.set(updateCount.get());
- } catch (IOException e) {
- LOGGER.warn("Failed to persist progress index {} for {}",
progressIndex.get(), this, e);
- }
- }
-
- public ProgressIndex restoreProgressIndex() {
- if (!progressIndexPersistFile.exists() ||
progressIndexPersistFile.length() == 0) {
- return progressIndex.get();
- }
-
- try {
- final byte[] fileData =
Files.readAllBytes(progressIndexPersistFile.toPath());
-
- try (final ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(fileData);
- final DataInputStream inputStream = new
DataInputStream(byteArrayInputStream)) {
- final ProgressIndex restoredIndex =
ProgressIndexType.deserializeFrom(inputStream);
-
progressIndex.get().updateToMinimumEqualOrIsAfterProgressIndex(restoredIndex);
- LOGGER.info(
- "{} successfully restored progress index from [{}], current index:
{}",
- this,
- progressIndexPersistFile.getAbsolutePath(),
- progressIndex.get());
- }
- } catch (final IOException e) {
- LOGGER.warn(
- "{} failed to restore progress index from [{}].",
- this,
- progressIndexPersistFile.getAbsolutePath(),
- e);
- }
- return progressIndex.get();
- }
-
- public void cancelPersistProgressIndexFuture() {
- if (Objects.nonNull(progressIndexPersistFile)
- && isRegisterPersistTask.getAndSet(false)
- && persistProgressIndexFuture != null) {
- persistProgressIndexFuture.cancel(false);
- persistProgressIndexFuture = null;
- }
}
public int getLeaderNodeId() {
@@ -242,16 +121,12 @@ public class PipeTaskMeta {
}
public static PipeTaskMeta deserialize(
- final PipeRuntimeMetaVersion version,
- final ByteBuffer byteBuffer,
- final int taskIndex,
- final boolean needPersist) {
+ final PipeRuntimeMetaVersion version, final ByteBuffer byteBuffer) {
final ProgressIndex progressIndex =
ProgressIndexType.deserializeFrom(byteBuffer);
final int leaderNodeId = ReadWriteIOUtils.readInt(byteBuffer);
- final PipeTaskMeta pipeTaskMeta =
- new PipeTaskMeta(progressIndex, leaderNodeId, taskIndex, needPersist);
+ final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(progressIndex,
leaderNodeId);
final int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
final PipeRuntimeException pipeRuntimeException =
@@ -262,17 +137,12 @@ public class PipeTaskMeta {
}
public static PipeTaskMeta deserialize(
- final PipeRuntimeMetaVersion version,
- final InputStream inputStream,
- final int taskIndex,
- final boolean needPersist)
- throws IOException {
+ final PipeRuntimeMetaVersion version, final InputStream inputStream)
throws IOException {
final ProgressIndex progressIndex =
ProgressIndexType.deserializeFrom(inputStream);
final int leaderNodeId = ReadWriteIOUtils.readInt(inputStream);
- final PipeTaskMeta pipeTaskMeta =
- new PipeTaskMeta(progressIndex, leaderNodeId, taskIndex, needPersist);
+ final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(progressIndex,
leaderNodeId);
final int size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
final PipeRuntimeException pipeRuntimeException =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index fdbda7fefd4..01019b0abcb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -46,22 +46,6 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeHardlinkTsFileDirName();
}
- public String getPipeProgressIndexPersistDirName() {
- return COMMON_CONFIG.getPipeProgressIndexPersistDirName();
- }
-
- public boolean isPipeProgressIndexPersistEnabled() {
- return COMMON_CONFIG.isPipeProgressIndexPersistEnabled();
- }
-
- public long getPipeProgressIndexPersistCheckPointGap() {
- return COMMON_CONFIG.getPipeProgressIndexPersistCheckPointGap();
- }
-
- public long getPipeProgressIndexFlushIntervalMs() {
- return COMMON_CONFIG.getPipeProgressIndexFlushIntervalMs();
- }
-
public String getPipeHardlinkWALDirName() {
return COMMON_CONFIG.getPipeHardlinkWALDirName();
}
@@ -482,11 +466,6 @@ public class PipeConfig {
LOGGER.info("PipeHardlinkBaseDirName: {}", getPipeHardlinkBaseDirName());
LOGGER.info("PipeHardlinkTsFileDirName: {}",
getPipeHardlinkTsFileDirName());
- LOGGER.info("PipeProgressIndexPersistDirName: {}",
getPipeProgressIndexPersistDirName());
- LOGGER.info("PipeProgressIndexPersistEnabled: {}",
isPipeProgressIndexPersistEnabled());
- LOGGER.info(
- "PipeProgressIndexPersistCheckPointGap: {}",
getPipeProgressIndexPersistCheckPointGap());
- LOGGER.info("PipeProgressIndexFlushIntervalMs: {}",
getPipeProgressIndexFlushIntervalMs());
LOGGER.info("PipeHardlinkWALDirName: {}", getPipeHardlinkWALDirName());
LOGGER.info("PipeHardLinkWALEnabled: {}", getPipeHardLinkWALEnabled());
LOGGER.info("PipeFileReceiverFsyncEnabled: {}",
getPipeFileReceiverFsyncEnabled());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 852beda7ed9..d50eacaf036 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -46,9 +46,6 @@ public class PipeDescriptor {
config.setPipeHardlinkTsFileDirName(
properties.getProperty(
"pipe_hardlink_tsfile_dir_name",
config.getPipeHardlinkTsFileDirName()));
- config.setPipeProgressIndexPersistDirName(
- properties.getProperty(
- "pipe_progress_index_persist_dir_name",
config.getPipeProgressIndexPersistDirName()));
config.setPipeHardlinkWALDirName(
properties.getProperty("pipe_hardlink_wal_dir_name",
config.getPipeHardlinkWALDirName()));
config.setPipeHardLinkWALEnabled(
@@ -100,21 +97,6 @@ public class PipeDescriptor {
Boolean.parseBoolean(
properties.getProperty(
"pipe_auto_restart_enabled",
String.valueOf(config.getPipeAutoRestartEnabled()))));
- config.setPipeProgressIndexPersistEnabled(
- Boolean.parseBoolean(
- properties.getProperty(
- "pipe_progress_index_persist_enabled",
- String.valueOf(config.isPipeProgressIndexPersistEnabled()))));
- config.setPipeProgressIndexPersistCheckPointGap(
- Long.parseLong(
- properties.getProperty(
- "pipe_progress_index_persist_check_point_gap",
-
String.valueOf(config.getPipeProgressIndexPersistCheckPointGap()))));
- config.setPipeProgressIndexFlushIntervalMs(
- Long.parseLong(
- properties.getProperty(
- "pipe_progress_index_flush_interval_ms",
-
String.valueOf(config.getPipeProgressIndexFlushIntervalMs()))));
config.setPipeAirGapReceiverEnabled(
Boolean.parseBoolean(
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
index d4e6008547c..8e04baf7cc0 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
@@ -97,27 +97,20 @@ public class PipeMetaDeSerTest {
new PipeRuntimeMeta(
new ConcurrentHashMap<Integer, PipeTaskMeta>() {
{
- put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 987,
123, false));
- put(234, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 789,
234, false));
- put(345, new PipeTaskMeta(new SimpleProgressIndex(3, 4), 789,
345, false));
- put(456, new PipeTaskMeta(finalHybridProgressIndex, 789, 456,
false));
+ put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 987));
+ put(234, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 789));
+ put(345, new PipeTaskMeta(new SimpleProgressIndex(3, 4), 789));
+ put(456, new PipeTaskMeta(finalHybridProgressIndex, 789));
put(
567,
new PipeTaskMeta(
- new RecoverProgressIndex(1, new SimpleProgressIndex(1,
9)),
- 123,
- 567,
- false));
+ new RecoverProgressIndex(1, new SimpleProgressIndex(1,
9)), 123));
put(
678,
new PipeTaskMeta(
new
TimeWindowStateProgressIndex(timeSeries2TimestampWindowBufferPairMap),
- 789,
- 678,
- false));
- put(
- Integer.MIN_VALUE,
- new PipeTaskMeta(new MetaProgressIndex(987), 0,
Integer.MIN_VALUE, false));
+ 789));
+ put(Integer.MIN_VALUE, new PipeTaskMeta(new
MetaProgressIndex(987), 0));
}
});
ByteBuffer runtimeByteBuffer = pipeRuntimeMeta.serialize();
@@ -136,7 +129,6 @@ public class PipeMetaDeSerTest {
Assert.assertEquals(pipeRuntimeMeta, pipeRuntimeMeta1);
pipeRuntimeMeta.getStatus().set(PipeStatus.DROPPED);
- pipeRuntimeMeta.onSetPipeDroppedOrStopped();
pipeRuntimeMeta.setIsStoppedByRuntimeException(true);
pipeRuntimeMeta.setExceptionsClearTime(0);
pipeRuntimeMeta