This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new a07ba5c7229 [To rel/1.2][IOTDB-5931] Pipe: async execute
pipeHandleLeaderChange and pipeHandleMetaChange to avoid causing heartbeats to
timeout (#10204)
a07ba5c7229 is described below
commit a07ba5c7229d819fe0f9f17e9e79dd43c3c4df40
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Jun 19 09:03:27 2023 +0800
[To rel/1.2][IOTDB-5931] Pipe: async execute pipeHandleLeaderChange and
pipeHandleMetaChange to avoid causing heartbeats to timeout (#10204)
---
.../iotdb/confignode/manager/ProcedureManager.java | 48 ++--
.../pipe/runtime/PipeHeartbeatParser.java} | 246 ++++++---------------
...ordinator.java => PipeLeaderChangeHandler.java} | 58 +----
.../manager/pipe/runtime/PipeMetaSyncer.java | 4 +-
.../pipe/runtime/PipeRuntimeCoordinator.java | 91 ++------
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 2 +-
.../runtime/PipeHandleMetaChangeProcedure.java | 174 +--------------
.../runtime/PipeHandleMetaChangeProcedureTest.java | 7 +-
.../iotdb/commons/concurrent/ThreadName.java | 18 +-
.../pipe/resource/wal/PipeWALResourceManager.java | 2 +-
10 files changed, 131 insertions(+), 519 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index fc84b840245..a0cf05716a1 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -94,7 +94,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -834,51 +833,36 @@ public class ProcedureManager {
}
}
- public TSStatus pipeHandleLeaderChange(
+ public void pipeHandleLeaderChange(
Map<TConsensusGroupId, Pair<Integer, Integer>>
dataRegionGroupToOldAndNewLeaderPairMap) {
try {
- long procedureId =
+ final long procedureId =
executor.submitProcedure(
new
PipeHandleLeaderChangeProcedure(dataRegionGroupToOldAndNewLeaderPairMap));
- List<TSStatus> statusList = new ArrayList<>();
- boolean isSucceed =
- waitingProcedureFinished(Collections.singletonList(procedureId),
statusList);
- if (isSucceed) {
- return RpcUtils.SUCCESS_STATUS;
- } else {
- return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
- }
+ LOGGER.info("PipeHandleLeaderChangeProcedure was submitted, procedureId:
{}.", procedureId);
} catch (Exception e) {
- return new
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ LOGGER.warn("PipeHandleLeaderChangeProcedure was failed to submit.", e);
}
}
- public TSStatus pipeMetaSync() {
+ public void pipeHandleMetaChange(
+ boolean needWriteConsensusOnConfigNodes, boolean
needPushPipeMetaToDataNodes) {
try {
- long procedureId = executor.submitProcedure(new PipeMetaSyncProcedure());
- List<TSStatus> statusList = new ArrayList<>();
- boolean isSucceed =
- waitingProcedureFinished(Collections.singletonList(procedureId),
statusList);
- if (isSucceed) {
- return RpcUtils.SUCCESS_STATUS;
- } else {
- return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
- .setMessage(statusList.get(0).getMessage());
- }
+ final long procedureId =
+ executor.submitProcedure(
+ new PipeHandleMetaChangeProcedure(
+ needWriteConsensusOnConfigNodes,
needPushPipeMetaToDataNodes));
+ LOGGER.info("PipeHandleMetaChangeProcedure was submitted, procedureId:
{}.", procedureId);
} catch (Exception e) {
- return new
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ LOGGER.warn("PipeHandleMetaChangeProcedure was failed to submit.", e);
}
}
- public TSStatus pipeHandleMetaChange(
- int dataNodeId, @NotNull List<ByteBuffer>
pipeMetaByteBufferListFromDataNode) {
+ public TSStatus pipeMetaSync() {
try {
- long procedureId =
- executor.submitProcedure(
- new PipeHandleMetaChangeProcedure(dataNodeId,
pipeMetaByteBufferListFromDataNode));
- List<TSStatus> statusList = new ArrayList<>();
- boolean isSucceed =
+ final long procedureId = executor.submitProcedure(new
PipeMetaSyncProcedure());
+ final List<TSStatus> statusList = new ArrayList<>();
+ final boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId),
statusList);
if (isSucceed) {
return RpcUtils.SUCCESS_STATUS;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
similarity index 51%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
index 435b3e0b41f..537a5e7d1cc 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
+package org.apache.iotdb.confignode.manager.pipe.runtime;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
@@ -27,70 +27,91 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
-import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
-import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.confignode.manager.ConfigManager;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataOutputStream;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
-public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV2 {
+public class PipeHeartbeatParser {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeHandleMetaChangeProcedure.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeHeartbeatParser.class);
- private int dataNodeId;
- private final List<ByteBuffer> pipeMetaByteBufferListFromDataNode;
+ private final ConfigManager configManager;
- private boolean needWriteConsensusOnConfigNodes = false;
- private boolean needPushPipeMetaToDataNodes = false;
+ private long heartbeatCounter;
+ private int registeredDataNodeNumber;
- public PipeHandleMetaChangeProcedure() {
- super();
- pipeMetaByteBufferListFromDataNode = new ArrayList<>();
+ private final AtomicBoolean needWriteConsensusOnConfigNodes;
+ private final AtomicBoolean needPushPipeMetaToDataNodes;
+
+ PipeHeartbeatParser(ConfigManager configManager) {
+ this.configManager = configManager;
+
+ heartbeatCounter = 0;
+ registeredDataNodeNumber = 1;
+
+ needWriteConsensusOnConfigNodes = new AtomicBoolean(false);
+ needPushPipeMetaToDataNodes = new AtomicBoolean(false);
}
- public PipeHandleMetaChangeProcedure(
+ public synchronized void parseHeartbeat(
int dataNodeId, @NotNull List<ByteBuffer>
pipeMetaByteBufferListFromDataNode) {
- super();
- this.dataNodeId = dataNodeId;
- this.pipeMetaByteBufferListFromDataNode =
pipeMetaByteBufferListFromDataNode;
- needWriteConsensusOnConfigNodes = false;
- needPushPipeMetaToDataNodes = false;
- }
+ final long heartbeatCount = ++heartbeatCounter;
+
+ final AtomicBoolean canSubmitHandleMetaChangeProcedure = new
AtomicBoolean(false);
+ // registeredDataNodeNumber can not be 0 when the method is called
+ if (heartbeatCount % registeredDataNodeNumber == 0) {
+ canSubmitHandleMetaChangeProcedure.set(true);
+
+ // registeredDataNodeNumber may be changed, update it here when we can
submit procedure
+ registeredDataNodeNumber =
configManager.getNodeManager().getRegisteredDataNodeCount();
+ if (registeredDataNodeNumber <= 0) {
+ LOGGER.warn(
+ "registeredDataNodeNumber is {} when parseHeartbeat from data node
(id={}).",
+ registeredDataNodeNumber,
+ dataNodeId);
+ // registeredDataNodeNumber can not be set to 0 in this class,
otherwise may cause
+ // DivideByZeroException
+ registeredDataNodeNumber = 1;
+ }
+ }
- @Override
- protected PipeTaskOperation getOperation() {
- return PipeTaskOperation.HANDLE_PIPE_META_CHANGE;
- }
+ if (pipeMetaByteBufferListFromDataNode.isEmpty()
+ && !(canSubmitHandleMetaChangeProcedure.get()
+ && (needWriteConsensusOnConfigNodes.get() ||
needPushPipeMetaToDataNodes.get()))) {
+ return;
+ }
- @Override
- protected void executeFromValidateTask(ConfigNodeProcedureEnv env) {
- LOGGER.info("PipeHandleMetaChangeProcedure: executeFromValidateTask");
+ PipeRuntimeCoordinator.PROCEDURE_SUBMITTER.submit(
+ () -> {
+ if (!pipeMetaByteBufferListFromDataNode.isEmpty()) {
+ parseHeartbeatAndSaveMetaChangeLocally(dataNodeId,
pipeMetaByteBufferListFromDataNode);
+ }
- // do nothing
- }
+ if (canSubmitHandleMetaChangeProcedure.get()
+ && (needWriteConsensusOnConfigNodes.get() ||
needPushPipeMetaToDataNodes.get())) {
+ configManager
+ .getProcedureManager()
+ .pipeHandleMetaChange(
+ needWriteConsensusOnConfigNodes.get(),
needPushPipeMetaToDataNodes.get());
- @Override
- protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
- LOGGER.info("PipeHandleMetaChangeProcedure:
executeFromCalculateInfoForTask");
+ // reset flags after procedure is submitted
+ needWriteConsensusOnConfigNodes.set(false);
+ needPushPipeMetaToDataNodes.set(false);
+ }
+ });
+ }
+ private void parseHeartbeatAndSaveMetaChangeLocally(
+ int dataNodeId, @NotNull List<ByteBuffer>
pipeMetaByteBufferListFromDataNode) {
final Map<PipeStaticMeta, PipeMeta> pipeMetaMapFromDataNode = new
HashMap<>();
for (ByteBuffer byteBuffer : pipeMetaByteBufferListFromDataNode) {
final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
@@ -98,7 +119,7 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
}
for (final PipeMeta pipeMetaOnConfigNode :
- env.getConfigManager()
+ configManager
.getPipeManager()
.getPipeTaskCoordinator()
.getPipeTaskInfo()
@@ -152,7 +173,7 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
.getValue()
.updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex()));
- needWriteConsensusOnConfigNodes = true;
+ needWriteConsensusOnConfigNodes.set(true);
}
// update runtime exception
@@ -172,8 +193,8 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
.equals(PipeStatus.STOPPED)) {
pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
- needWriteConsensusOnConfigNodes = true;
- needPushPipeMetaToDataNodes = true;
+ needWriteConsensusOnConfigNodes.set(true);
+ needPushPipeMetaToDataNodes.set(true);
LOGGER.warn(
String.format(
@@ -183,7 +204,7 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
if (exception instanceof PipeRuntimeConnectorCriticalException) {
((PipeTableResp)
- env.getConfigManager()
+ configManager
.getPipeManager()
.getPipeTaskCoordinator()
.getPipeTaskInfo()
@@ -195,8 +216,8 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
status -> {
status.set(PipeStatus.STOPPED);
- needWriteConsensusOnConfigNodes = true;
- needPushPipeMetaToDataNodes = true;
+ needWriteConsensusOnConfigNodes.set(true);
+ needPushPipeMetaToDataNodes.set(true);
LOGGER.warn(
String.format(
@@ -209,131 +230,4 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
}
}
}
-
- @Override
- protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv
env) {
- LOGGER.info("PipeHandleMetaChangeProcedure:
executeFromWriteConfigNodeConsensus");
-
- if (!needWriteConsensusOnConfigNodes) {
- return;
- }
-
- final List<PipeMeta> pipeMetaList = new ArrayList<>();
- for (final PipeMeta pipeMeta :
- env.getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .getPipeTaskInfo()
- .getPipeMetaList()) {
- pipeMetaList.add(pipeMeta);
- }
-
- final ConsensusWriteResponse response =
- env.getConfigManager()
- .getConsensusManager()
- .write(new PipeHandleMetaChangePlan(pipeMetaList));
- if (!response.isSuccessful()) {
- throw new PipeException(response.getErrorMessage());
- }
- }
-
- @Override
- protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
- LOGGER.info("PipeHandleMetaChangeProcedure: executeFromHandleOnDataNodes");
-
- if (!needPushPipeMetaToDataNodes) {
- return;
- }
-
- pushPipeMetaToDataNodesIgnoreException(env);
- }
-
- @Override
- protected void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
- LOGGER.info("PipeHandleMetaChangeProcedure: rollbackFromValidateTask");
-
- // do nothing
- }
-
- @Override
- protected void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
- LOGGER.info("PipeHandleMetaChangeProcedure:
rollbackFromCalculateInfoForTask");
-
- // do nothing
- }
-
- @Override
- protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv
env) {
- LOGGER.info("PipeHandleMetaChangeProcedure:
rollbackFromWriteConfigNodeConsensus");
-
- // do nothing
- }
-
- @Override
- protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
- LOGGER.info("PipeHandleMetaChangeProcedure:
rollbackFromOperateOnDataNodes");
-
- // do nothing
- }
-
- @Override
- public void serialize(DataOutputStream stream) throws IOException {
-
stream.writeShort(ProcedureType.PIPE_HANDLE_META_CHANGE_PROCEDURE.getTypeCode());
- super.serialize(stream);
-
- ReadWriteIOUtils.write(dataNodeId, stream);
-
- ReadWriteIOUtils.write(pipeMetaByteBufferListFromDataNode.size(), stream);
- for (ByteBuffer pipeMetaByteBuffer : pipeMetaByteBufferListFromDataNode) {
- ReadWriteIOUtils.write(pipeMetaByteBuffer.limit(), stream);
- ReadWriteIOUtils.write(new Binary(pipeMetaByteBuffer.array()), stream);
- }
-
- ReadWriteIOUtils.write(needWriteConsensusOnConfigNodes, stream);
- ReadWriteIOUtils.write(needPushPipeMetaToDataNodes, stream);
- }
-
- @Override
- public void deserialize(ByteBuffer byteBuffer) {
- super.deserialize(byteBuffer);
-
- dataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
-
- final int size = ReadWriteIOUtils.readInt(byteBuffer);
- for (int i = 0; i < size; ++i) {
- final int limit = ReadWriteIOUtils.readInt(byteBuffer);
- final ByteBuffer pipeMetaByteBuffer =
- ByteBuffer.wrap(ReadWriteIOUtils.readBinary(byteBuffer).getValues());
- pipeMetaByteBuffer.limit(limit);
- pipeMetaByteBufferListFromDataNode.add(pipeMetaByteBuffer);
- }
-
- needWriteConsensusOnConfigNodes = ReadWriteIOUtils.readBool(byteBuffer);
- needPushPipeMetaToDataNodes = ReadWriteIOUtils.readBool(byteBuffer);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof PipeHandleMetaChangeProcedure)) {
- return false;
- }
- PipeHandleMetaChangeProcedure that = (PipeHandleMetaChangeProcedure) o;
- return dataNodeId == that.dataNodeId
- && needWriteConsensusOnConfigNodes ==
that.needWriteConsensusOnConfigNodes
- && needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes
- && Objects.equals(
- pipeMetaByteBufferListFromDataNode,
that.pipeMetaByteBufferListFromDataNode);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(
- dataNodeId,
- pipeMetaByteBufferListFromDataNode,
- needWriteConsensusOnConfigNodes,
- needPushPipeMetaToDataNodes);
- }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
similarity index 63%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
index 00182aac69d..3f93fb022b9 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
@@ -21,35 +21,22 @@ package org.apache.iotdb.confignode.manager.pipe.runtime;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.manager.ConfigManager;
import
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent;
import
org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent;
import org.apache.iotdb.metrics.utils.IoTDBMetricsUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.jetbrains.annotations.NotNull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeRuntimeCoordinator.class);
+public class PipeLeaderChangeHandler implements IClusterStatusSubscriber {
private final ConfigManager configManager;
- private final PipeMetaSyncer pipeMetaSyncer;
-
- public PipeRuntimeCoordinator(ConfigManager configManager) {
+ PipeLeaderChangeHandler(ConfigManager configManager) {
this.configManager = configManager;
- this.pipeMetaSyncer = new PipeMetaSyncer(configManager);
}
@Override
@@ -90,40 +77,11 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
return;
}
- final TSStatus result =
- configManager
- .getProcedureManager()
- .pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap);
- if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn(
- "PipeRuntimeCoordinator meets error in handling data region leader
change, status: ({})",
- result);
- }
- }
-
- public void startPipeMetaSync() {
- pipeMetaSyncer.start();
- }
-
- public void stopPipeMetaSync() {
- pipeMetaSyncer.stop();
- }
-
- /**
- * parse heartbeat from data node.
- *
- * @param dataNodeId data node id
- * @param pipeMetaByteBufferListFromDataNode pipe meta byte buffer list
collected from data node
- */
- public void parseHeartbeat(
- int dataNodeId, @NotNull List<ByteBuffer>
pipeMetaByteBufferListFromDataNode) {
- final TSStatus result =
- configManager
- .getProcedureManager()
- .pipeHandleMetaChange(dataNodeId,
pipeMetaByteBufferListFromDataNode);
- if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn(
- "PipeTaskCoordinator meets error in handling pipe meta change,
status: ({})", result);
- }
+ // submit procedure in an async way to avoid blocking the caller
+ PipeRuntimeCoordinator.PROCEDURE_SUBMITTER.submit(
+ () ->
+ configManager
+ .getProcedureManager()
+
.pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap));
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
index 2251d0d7892..715dabe2a46 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
@@ -40,7 +40,7 @@ public class PipeMetaSyncer {
private static final ScheduledExecutorService SYNC_EXECUTOR =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
- ThreadName.PIPE_META_SYNC_SERVICE.getName());
+ ThreadName.PIPE_RUNTIME_META_SYNCER.getName());
private static final long INITIAL_SYNC_DELAY_MINUTES =
PipeConfig.getInstance().getPipeMetaSyncerInitialSyncDelayMinutes();
private static final long SYNC_INTERVAL_MINUTES =
@@ -50,7 +50,7 @@ public class PipeMetaSyncer {
private Future<?> metaSyncFuture;
- public PipeMetaSyncer(ConfigManager configManager) {
+ PipeMetaSyncer(ConfigManager configManager) {
this.configManager = configManager;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 00182aac69d..23398038b55 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -19,86 +19,49 @@
package org.apache.iotdb.confignode.manager.pipe.runtime;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.confignode.manager.ConfigManager;
import
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent;
import
org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent;
-import org.apache.iotdb.metrics.utils.IoTDBMetricsUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.jetbrains.annotations.NotNull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ExecutorService;
public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeRuntimeCoordinator.class);
-
- private final ConfigManager configManager;
+ // shared thread pool in the runtime package
+ static final ExecutorService PROCEDURE_SUBMITTER =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.PIPE_RUNTIME_PROCEDURE_SUBMITTER.getName());
+ private final PipeLeaderChangeHandler pipeLeaderChangeHandler;
+ private final PipeHeartbeatParser pipeHeartbeatParser;
private final PipeMetaSyncer pipeMetaSyncer;
public PipeRuntimeCoordinator(ConfigManager configManager) {
- this.configManager = configManager;
- this.pipeMetaSyncer = new PipeMetaSyncer(configManager);
+ pipeLeaderChangeHandler = new PipeLeaderChangeHandler(configManager);
+ pipeHeartbeatParser = new PipeHeartbeatParser(configManager);
+ pipeMetaSyncer = new PipeMetaSyncer(configManager);
}
@Override
public void onClusterStatisticsChanged(StatisticsChangeEvent event) {
- // do nothing, because pipe task is not related to statistics
+ pipeLeaderChangeHandler.onClusterStatisticsChanged(event);
}
@Override
public void onRegionGroupLeaderChanged(RouteChangeEvent event) {
- // if no pipe task, return
- if
(configManager.getPipeManager().getPipeTaskCoordinator().getPipeTaskInfo().isEmpty())
{
- return;
- }
-
- // we only care about data region leader change
- final Map<TConsensusGroupId, Pair<Integer, Integer>>
dataRegionGroupToOldAndNewLeaderPairMap =
- new HashMap<>();
- event
- .getLeaderMap()
- .forEach(
- (regionGroupId, pair) -> {
- if
(regionGroupId.getType().equals(TConsensusGroupType.DataRegion)) {
- final String databaseName =
-
configManager.getPartitionManager().getRegionStorageGroup(regionGroupId);
- if (databaseName != null &&
!databaseName.equals(IoTDBMetricsUtils.DATABASE)) {
- // pipe only collect user's data, filter metric database
here.
- dataRegionGroupToOldAndNewLeaderPairMap.put(
- regionGroupId,
- new Pair<>( // null or -1 means empty origin leader
- pair.left == null ? -1 : pair.left,
- pair.right == null ? -1 : pair.right));
- }
- }
- });
-
- // if no data region leader change, return
- if (dataRegionGroupToOldAndNewLeaderPairMap.isEmpty()) {
- return;
- }
+ pipeLeaderChangeHandler.onRegionGroupLeaderChanged(event);
+ }
- final TSStatus result =
- configManager
- .getProcedureManager()
- .pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap);
- if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn(
- "PipeRuntimeCoordinator meets error in handling data region leader
change, status: ({})",
- result);
- }
+ public void parseHeartbeat(
+ int dataNodeId, @NotNull List<ByteBuffer>
pipeMetaByteBufferListFromDataNode) {
+ pipeHeartbeatParser.parseHeartbeat(dataNodeId,
pipeMetaByteBufferListFromDataNode);
}
public void startPipeMetaSync() {
@@ -108,22 +71,4 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
public void stopPipeMetaSync() {
pipeMetaSyncer.stop();
}
-
- /**
- * parse heartbeat from data node.
- *
- * @param dataNodeId data node id
- * @param pipeMetaByteBufferListFromDataNode pipe meta byte buffer list
collected from data node
- */
- public void parseHeartbeat(
- int dataNodeId, @NotNull List<ByteBuffer>
pipeMetaByteBufferListFromDataNode) {
- final TSStatus result =
- configManager
- .getProcedureManager()
- .pipeHandleMetaChange(dataNodeId,
pipeMetaByteBufferListFromDataNode);
- if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn(
- "PipeTaskCoordinator meets error in handling pipe meta change,
status: ({})", result);
- }
- }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index e48901b7824..27a9fd1de25 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -51,7 +51,7 @@ public abstract class AbstractOperatePipeProcedureV2
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractOperatePipeProcedureV2.class);
- private static final int RETRY_THRESHOLD = 3;
+ private static final int RETRY_THRESHOLD = 1;
// only used in rollback to reduce the number of network calls
protected boolean isRollbackFromOperateOnDataNodesSuccessful = false;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 435b3e0b41f..d7e9971dbb9 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -19,26 +19,16 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
-import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
-import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
-import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
-import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
-import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,33 +36,25 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV2 {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeHandleMetaChangeProcedure.class);
- private int dataNodeId;
- private final List<ByteBuffer> pipeMetaByteBufferListFromDataNode;
-
private boolean needWriteConsensusOnConfigNodes = false;
private boolean needPushPipeMetaToDataNodes = false;
public PipeHandleMetaChangeProcedure() {
super();
- pipeMetaByteBufferListFromDataNode = new ArrayList<>();
}
public PipeHandleMetaChangeProcedure(
- int dataNodeId, @NotNull List<ByteBuffer>
pipeMetaByteBufferListFromDataNode) {
+ boolean needWriteConsensusOnConfigNodes, boolean
needPushPipeMetaToDataNodes) {
super();
- this.dataNodeId = dataNodeId;
- this.pipeMetaByteBufferListFromDataNode =
pipeMetaByteBufferListFromDataNode;
- needWriteConsensusOnConfigNodes = false;
- needPushPipeMetaToDataNodes = false;
+ this.needWriteConsensusOnConfigNodes = needWriteConsensusOnConfigNodes;
+ this.needPushPipeMetaToDataNodes = needPushPipeMetaToDataNodes;
}
@Override
@@ -91,123 +73,7 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeHandleMetaChangeProcedure:
executeFromCalculateInfoForTask");
- final Map<PipeStaticMeta, PipeMeta> pipeMetaMapFromDataNode = new
HashMap<>();
- for (ByteBuffer byteBuffer : pipeMetaByteBufferListFromDataNode) {
- final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
- pipeMetaMapFromDataNode.put(pipeMeta.getStaticMeta(), pipeMeta);
- }
-
- for (final PipeMeta pipeMetaOnConfigNode :
- env.getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .getPipeTaskInfo()
- .getPipeMetaList()) {
- final PipeMeta pipeMetaFromDataNode =
- pipeMetaMapFromDataNode.get(pipeMetaOnConfigNode.getStaticMeta());
- if (pipeMetaFromDataNode == null) {
- LOGGER.info(
- "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
- + "pipeMetaFromDataNode is null, pipeMetaOnConfigNode: {}",
- pipeMetaOnConfigNode);
- continue;
- }
-
- final Map<TConsensusGroupId, PipeTaskMeta> pipeTaskMetaMapOnConfigNode =
-
pipeMetaOnConfigNode.getRuntimeMeta().getConsensusGroupIdToTaskMetaMap();
- final Map<TConsensusGroupId, PipeTaskMeta> pipeTaskMetaMapFromDataNode =
-
pipeMetaFromDataNode.getRuntimeMeta().getConsensusGroupIdToTaskMetaMap();
- for (final Map.Entry<TConsensusGroupId, PipeTaskMeta>
runtimeMetaOnConfigNode :
- pipeTaskMetaMapOnConfigNode.entrySet()) {
- if (runtimeMetaOnConfigNode.getValue().getLeaderDataNodeId() !=
dataNodeId) {
- continue;
- }
-
- final PipeTaskMeta runtimeMetaFromDataNode =
- pipeTaskMetaMapFromDataNode.get(runtimeMetaOnConfigNode.getKey());
- if (runtimeMetaFromDataNode == null) {
- LOGGER.warn(
- "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
- + "runtimeMetaFromDataNode is null, runtimeMetaOnConfigNode:
{}",
- runtimeMetaOnConfigNode);
- continue;
- }
-
- // update progress index
- if (!runtimeMetaOnConfigNode
- .getValue()
- .getProgressIndex()
- .isAfter(runtimeMetaFromDataNode.getProgressIndex())) {
- LOGGER.info(
- "Updating progress index for (pipe name: {}, consensus group id:
{}) ... Progress index on config node: {}, progress index from data node: {}",
- pipeMetaOnConfigNode.getStaticMeta().getPipeName(),
- runtimeMetaOnConfigNode.getKey(),
- runtimeMetaOnConfigNode.getValue().getProgressIndex(),
- runtimeMetaFromDataNode.getProgressIndex());
- LOGGER.info(
- "Progress index for (pipe name: {}, consensus group id: {}) is
updated to {}",
- pipeMetaOnConfigNode.getStaticMeta().getPipeName(),
- runtimeMetaOnConfigNode.getKey(),
- runtimeMetaOnConfigNode
- .getValue()
-
.updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex()));
-
- needWriteConsensusOnConfigNodes = true;
- }
-
- // update runtime exception
- final PipeTaskMeta pipeTaskMetaOnConfigNode =
runtimeMetaOnConfigNode.getValue();
- pipeTaskMetaOnConfigNode.clearExceptionMessages();
- for (final PipeRuntimeException exception :
- runtimeMetaFromDataNode.getExceptionMessages()) {
-
- pipeTaskMetaOnConfigNode.trackExceptionMessage(exception);
-
- if (exception instanceof PipeRuntimeCriticalException) {
- final String pipeName =
pipeMetaOnConfigNode.getStaticMeta().getPipeName();
- if (!pipeMetaOnConfigNode
- .getRuntimeMeta()
- .getStatus()
- .get()
- .equals(PipeStatus.STOPPED)) {
-
pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
-
- needWriteConsensusOnConfigNodes = true;
- needPushPipeMetaToDataNodes = true;
-
- LOGGER.warn(
- String.format(
- "Detect PipeRuntimeCriticalException %s from DataNode,
stop pipe %s.",
- exception, pipeName));
- }
-
- if (exception instanceof PipeRuntimeConnectorCriticalException) {
- ((PipeTableResp)
- env.getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .getPipeTaskInfo()
- .showPipes())
- .filter(true, pipeName).getAllPipeMeta().stream()
- .map(pipeMeta -> pipeMeta.getRuntimeMeta().getStatus())
- .filter(status ->
!status.get().equals(PipeStatus.STOPPED))
- .forEach(
- status -> {
- status.set(PipeStatus.STOPPED);
-
- needWriteConsensusOnConfigNodes = true;
- needPushPipeMetaToDataNodes = true;
-
- LOGGER.warn(
- String.format(
- "Detect
PipeRuntimeConnectorCriticalException %s from DataNode, stop pipe %s.",
- exception, pipeName));
- });
- }
- }
- }
- }
- }
+ // do nothing
}
@Override
@@ -281,14 +147,6 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
stream.writeShort(ProcedureType.PIPE_HANDLE_META_CHANGE_PROCEDURE.getTypeCode());
super.serialize(stream);
- ReadWriteIOUtils.write(dataNodeId, stream);
-
- ReadWriteIOUtils.write(pipeMetaByteBufferListFromDataNode.size(), stream);
- for (ByteBuffer pipeMetaByteBuffer : pipeMetaByteBufferListFromDataNode) {
- ReadWriteIOUtils.write(pipeMetaByteBuffer.limit(), stream);
- ReadWriteIOUtils.write(new Binary(pipeMetaByteBuffer.array()), stream);
- }
-
ReadWriteIOUtils.write(needWriteConsensusOnConfigNodes, stream);
ReadWriteIOUtils.write(needPushPipeMetaToDataNodes, stream);
}
@@ -297,17 +155,6 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
- dataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
-
- final int size = ReadWriteIOUtils.readInt(byteBuffer);
- for (int i = 0; i < size; ++i) {
- final int limit = ReadWriteIOUtils.readInt(byteBuffer);
- final ByteBuffer pipeMetaByteBuffer =
- ByteBuffer.wrap(ReadWriteIOUtils.readBinary(byteBuffer).getValues());
- pipeMetaByteBuffer.limit(limit);
- pipeMetaByteBufferListFromDataNode.add(pipeMetaByteBuffer);
- }
-
needWriteConsensusOnConfigNodes = ReadWriteIOUtils.readBool(byteBuffer);
needPushPipeMetaToDataNodes = ReadWriteIOUtils.readBool(byteBuffer);
}
@@ -321,19 +168,12 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
return false;
}
PipeHandleMetaChangeProcedure that = (PipeHandleMetaChangeProcedure) o;
- return dataNodeId == that.dataNodeId
- && needWriteConsensusOnConfigNodes ==
that.needWriteConsensusOnConfigNodes
- && needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes
- && Objects.equals(
- pipeMetaByteBufferListFromDataNode,
that.pipeMetaByteBufferListFromDataNode);
+ return needWriteConsensusOnConfigNodes ==
that.needWriteConsensusOnConfigNodes
+ && needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes;
}
@Override
public int hashCode() {
- return Objects.hash(
- dataNodeId,
- pipeMetaByteBufferListFromDataNode,
- needWriteConsensusOnConfigNodes,
- needPushPipeMetaToDataNodes);
+ return Objects.hash(needWriteConsensusOnConfigNodes,
needPushPipeMetaToDataNodes);
}
}
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
index 38783ea2e53..47a0dca1abd 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
@@ -22,7 +22,6 @@ package
org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
-import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -34,7 +33,6 @@ import org.junit.Test;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
@@ -78,10 +76,7 @@ public class PipeHandleMetaChangeProcedureTest {
}
});
- PipeHandleMetaChangeProcedure proc =
- new PipeHandleMetaChangeProcedure(
- 123,
- Collections.singletonList(new PipeMeta(pipeStaticMeta,
pipeRuntimeMeta).serialize()));
+ PipeHandleMetaChangeProcedure proc = new
PipeHandleMetaChangeProcedure(true, false);
try {
proc.serialize(outputStream);
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 094a722f75f..7170d4a4e4c 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -99,17 +99,15 @@ public enum ThreadName {
GRPC_DEFAULT_EXECUTOR("grpc-default-executor"),
GPRC_DEFAULT_WORKER_ELG("grpc-default-worker-ELG"),
GROUP_MANAGEMENT("groupManagement"),
-
// -------------------------- Compute --------------------------
PIPE_ASSIGNER_EXECUTOR_POOL("Pipe-Assigner-Executor-Pool"),
PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
- EXT_PIPE_PLUGIN_WORKER("ExtPipePlugin-Worker"),
+ PIPE_RUNTIME_META_SYNCER("Pipe-Runtime-Meta-Syncer"),
+ PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"),
+ PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"),
WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
- // -------------------------- Sync --------------------------
- SYNC_SENDER_PIPE("Sync-Pipe"),
- SYNC_SENDER_HEARTBEAT("Sync-Heartbeat"),
// -------------------------- JVM --------------------------
// NOTICE: The thread name of jvm cannot be edited here!
// We list the thread name here just for distinguishing what module the
thread belongs to.
@@ -144,8 +142,6 @@ public enum ThreadName {
MLNODE_RPC_SERVICE("MLNodeRpc-Service"),
IOTDB_SHUTDOWN_HOOK("IoTDB-Shutdown-Hook"),
STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"),
- PIPE_META_SYNC_SERVICE("Pipe-Meta-Sync-Service"),
-
PIPE_WAL_RESOURCE_TTL_CHECKER_SERVICE("Pipe-WAL-Resource-TTL-Checker-Service"),
// the unknown thread name is used for metrics
UNKOWN("UNKNOWN");
@@ -231,10 +227,11 @@ public enum ThreadName {
PIPE_PROCESSOR_EXECUTOR_POOL,
PIPE_CONNECTOR_EXECUTOR_POOL,
PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL,
- EXT_PIPE_PLUGIN_WORKER,
+ PIPE_RUNTIME_META_SYNCER,
+ PIPE_RUNTIME_PROCEDURE_SUBMITTER,
+ PIPE_WAL_RESOURCE_TTL_CHECKER,
WINDOW_EVALUATION_SERVICE));
- private static Set<ThreadName> syncThreadNames =
- new HashSet<>(Arrays.asList(SYNC_SENDER_PIPE, SYNC_SENDER_HEARTBEAT));
+
private static Set<ThreadName> jvmThreadNames =
new HashSet<>(
Arrays.asList(
@@ -292,7 +289,6 @@ public enum ThreadName {
iotConsensusThrreadNames,
ratisThreadNames,
computeThreadNames,
- syncThreadNames,
jvmThreadNames,
metricsThreadNames,
otherThreadNames
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index cadb4746984..de67bb8543f 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -22,7 +22,7 @@ public class PipeWALResourceManager implements AutoCloseable {
private static final ScheduledExecutorService PIPE_WAL_RESOURCE_TTL_CHECKER =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
- ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER_SERVICE.getName());
+ ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER.getName());
private final ScheduledFuture<?> ttlCheckerFuture;
public PipeWALResourceManager() {