This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b833e3b66d1 [IOTDB-5895][IOTDB-5904] Pipe: handling DataNode removal
(#9881)
b833e3b66d1 is described below
commit b833e3b66d1ce2ea21b5f58c9a5d3b4eba6292cd
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 22 01:29:08 2023 +0800
[IOTDB-5895][IOTDB-5904] Pipe: handling DataNode removal (#9881)
- [IOTDB-5895] Added handling DataNode removal logic for Pipe
- [IOTDB-5904] CI fails on IoTDBClusterRestartIT.clusterRestartTest
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../apache/iotdb/commons/service/ServiceType.java | 1 +
.../iotdb/db/pipe/agent/runtime/PipeLauncher.java | 18 +-
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 43 ++-
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 350 +++++++++++----------
.../execution/executor/PipeSubtaskExecutor.java | 2 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 10 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 6 +-
7 files changed, 244 insertions(+), 186 deletions(-)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 94d524defcf..7bde2af3102 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -75,6 +75,7 @@ public enum ServiceType {
IOT_CONSENSUS_SERVICE("IoTConsensus Service", "IoTConsensusRPCService"),
PIPE_PLUGIN_CLASSLOADER_MANAGER_SERVICE(
"Pipe Plugin Classloader Manager Service", "PipePluginClassLoader"),
+ PIPE_RUNTIME_AGENT("Pipe Runtime Agent", "PipeRuntimeAgent"),
MLNode_RPC_SERVICE("Rpc Service for MLNode", "MLNodeRPCService");
private final String name;
private final String jmxName;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
index 2d0d45ebc02..30ab6aee862 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
@@ -46,12 +46,16 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
-public class PipeLauncher {
+class PipeLauncher {
private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- public void launchPipePluginAgent(ResourcesInformationHolder
resourcesInformationHolder)
- throws StartupException {
+ private PipeLauncher() {
+ // forbidding instantiation
+ }
+
+ public static synchronized void launchPipePluginAgent(
+ ResourcesInformationHolder resourcesInformationHolder) throws
StartupException {
initPipePluginRelatedInstances();
if (resourcesInformationHolder.getPipePluginMetaList() == null
@@ -87,7 +91,7 @@ public class PipeLauncher {
}
}
- private void initPipePluginRelatedInstances() throws StartupException {
+ private static void initPipePluginRelatedInstances() throws StartupException
{
try {
PipePluginExecutableManager.setupAndGetInstance(
IOTDB_CONFIG.getPipeTemporaryLibDir(), IOTDB_CONFIG.getPipeDir());
@@ -97,7 +101,7 @@ public class PipeLauncher {
}
}
- private List<PipePluginMeta> getUninstalledOrConflictedPipePluginMetaList(
+ private static List<PipePluginMeta>
getUninstalledOrConflictedPipePluginMetaList(
ResourcesInformationHolder resourcesInformationHolder) {
final List<PipePluginMeta> pipePluginMetaList = new ArrayList<>();
for (PipePluginMeta pipePluginMeta :
resourcesInformationHolder.getPipePluginMetaList()) {
@@ -122,7 +126,7 @@ public class PipeLauncher {
return pipePluginMetaList;
}
- private void fetchAndSavePipePluginJars(List<PipePluginMeta>
pipePluginMetaList)
+ private static void fetchAndSavePipePluginJars(List<PipePluginMeta>
pipePluginMetaList)
throws StartupException {
try (ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
@@ -143,7 +147,7 @@ public class PipeLauncher {
}
}
- public void launchPipeTaskAgent() throws StartupException {
+ public static synchronized void launchPipeTaskAgent() throws
StartupException {
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
final TGetAllPipeInfoResp getAllPipeInfoResp =
configNodeClient.getAllPipeInfo();
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 2d85aac9e8c..5253d5a2db2 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -20,6 +20,9 @@
package org.apache.iotdb.db.pipe.agent.runtime;
import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
import org.apache.iotdb.db.service.ResourcesInformationHolder;
import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
@@ -27,15 +30,43 @@ import
org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PipeRuntimeAgent {
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class PipeRuntimeAgent implements IService {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeRuntimeAgent.class);
- public synchronized void launch(ResourcesInformationHolder
resourcesInformationHolder)
- throws StartupException {
- final PipeLauncher pipeLauncher = new PipeLauncher();
- pipeLauncher.launchPipePluginAgent(resourcesInformationHolder);
- pipeLauncher.launchPipeTaskAgent();
+ private static final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+ public synchronized void launchPipePluginAgent(
+ ResourcesInformationHolder resourcesInformationHolder) throws
StartupException {
+ PipeLauncher.launchPipePluginAgent(resourcesInformationHolder);
+ }
+
+ @Override
+ public synchronized void start() throws StartupException {
+ PipeLauncher.launchPipeTaskAgent();
+
+ isShutdown.set(false);
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (isShutdown.get()) {
+ return;
+ }
+ isShutdown.set(true);
+
+ PipeAgent.task().dropAllPipeTasks();
+ }
+
+ public boolean isShutdown() {
+ return isShutdown.get();
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.PIPE_RUNTIME_AGENT;
}
public void report(PipeSubtask subtask) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 04a3c6a05a1..e231d818dbf 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.task.PipeBuilder;
import org.apache.iotdb.db.pipe.task.PipeTask;
import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
@@ -67,9 +68,184 @@ public class PipeTaskAgent {
pipeTaskManager = new PipeTaskManager();
}
- ////////////////////////// Pipe Task Management //////////////////////////
+ ////////////////////////// Pipe Task Management Entry
//////////////////////////
- public synchronized void createPipe(PipeMeta pipeMeta) {
+ // TODO: handle progress index
+ public synchronized void handlePipeMetaChanges(List<PipeMeta>
pipeMetaListFromConfigNode) {
+ // do nothing if data node is removing or removed
+ if (PipeAgent.runtime().isShutdown()) {
+ return;
+ }
+
+ // iterate through pipe meta list from config node, check if pipe meta
exists on data node
+ // or has changed
+ for (final PipeMeta metaFromConfigNode : pipeMetaListFromConfigNode) {
+ final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
+
+ final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
+
+ // if pipe meta does not exist on data node, create a new pipe
+ if (metaOnDataNode == null) {
+ createPipe(metaFromConfigNode);
+ if (metaFromConfigNode.getRuntimeMeta().getStatus().get() ==
PipeStatus.RUNNING) {
+ startPipe(pipeName,
metaFromConfigNode.getStaticMeta().getCreationTime());
+ }
+ // if the status is STOPPED or DROPPED, do nothing
+ continue;
+ }
+
+ // if pipe meta exists on data node, check if it has changed
+ final PipeStaticMeta staticMetaOnDataNode =
metaOnDataNode.getStaticMeta();
+ final PipeStaticMeta staticMetaFromConfigNode =
metaFromConfigNode.getStaticMeta();
+
+ // first check if pipe static meta has changed, if so, drop the pipe and
create a new one
+ if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
+ dropPipe(pipeName);
+ createPipe(metaFromConfigNode);
+ if (metaFromConfigNode.getRuntimeMeta().getStatus().get() ==
PipeStatus.RUNNING) {
+ startPipe(pipeName,
metaFromConfigNode.getStaticMeta().getCreationTime());
+ }
+ // if the status is STOPPED or DROPPED, do nothing
+ continue;
+ }
+
+ // then check if pipe runtime meta has changed, if so, update the pipe
+ final PipeRuntimeMeta runtimeMetaOnDataNode =
metaOnDataNode.getRuntimeMeta();
+ final PipeRuntimeMeta runtimeMetaFromConfigNode =
metaFromConfigNode.getRuntimeMeta();
+ handlePipeRuntimeMetaChanges(
+ staticMetaFromConfigNode, runtimeMetaFromConfigNode,
runtimeMetaOnDataNode);
+ }
+
+ // check if there are pipes on data node that do not exist on config node,
if so, drop them
+ final Set<String> pipeNamesFromConfigNode =
+ pipeMetaListFromConfigNode.stream()
+ .map(meta -> meta.getStaticMeta().getPipeName())
+ .collect(Collectors.toSet());
+ for (final PipeMeta metaOnDataNode : pipeMetaKeeper.getPipeMetaList()) {
+ if
(!pipeNamesFromConfigNode.contains(metaOnDataNode.getStaticMeta().getPipeName()))
{
+ dropPipe(metaOnDataNode.getStaticMeta().getPipeName());
+ }
+ }
+ }
+
+ private void handlePipeRuntimeMetaChanges(
+ @NotNull PipeStaticMeta pipeStaticMeta,
+ @NotNull PipeRuntimeMeta runtimeMetaFromConfigNode,
+ @NotNull PipeRuntimeMeta runtimeMetaOnDataNode) {
+ // 1. handle data region group leader changed first
+ final Map<TConsensusGroupId, PipeTaskMeta>
consensusGroupIdToTaskMetaMapFromConfigNode =
+ runtimeMetaFromConfigNode.getConsensusGroupIdToTaskMetaMap();
+ final Map<TConsensusGroupId, PipeTaskMeta>
consensusGroupIdToTaskMetaMapOnDataNode =
+ runtimeMetaOnDataNode.getConsensusGroupIdToTaskMetaMap();
+
+ // 1.1 iterate over all consensus group ids in config node's pipe runtime
meta, decide if we
+ // need to drop and create a new task for each consensus group id
+ for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryFromConfigNode :
+ consensusGroupIdToTaskMetaMapFromConfigNode.entrySet()) {
+ final TConsensusGroupId consensusGroupIdFromConfigNode =
entryFromConfigNode.getKey();
+
+ final PipeTaskMeta taskMetaFromConfigNode =
entryFromConfigNode.getValue();
+ final PipeTaskMeta taskMetaOnDataNode =
+
consensusGroupIdToTaskMetaMapOnDataNode.get(consensusGroupIdFromConfigNode);
+
+ // if task meta does not exist on data node, create a new task
+ if (taskMetaOnDataNode == null) {
+ createPipeTask(
+ consensusGroupIdFromConfigNode,
+ pipeStaticMeta,
+ taskMetaFromConfigNode.getProgressIndex(),
+ taskMetaFromConfigNode.getRegionLeader());
+ // we keep the new created task's status consistent with the status
recorded in data node's
+ // pipe runtime meta. please note that the status recorded in data
node's pipe runtime meta
+ // is not reliable, but we will have a check later to make sure the
status is correct.
+ if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
+ startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
+ }
+ continue;
+ }
+
+ // if task meta exists on data node, check if it has changed
+ final int regionLeaderFromConfigNode =
taskMetaFromConfigNode.getRegionLeader();
+ final int regionLeaderOnDataNode = taskMetaOnDataNode.getRegionLeader();
+
+ if (regionLeaderFromConfigNode != regionLeaderOnDataNode) {
+ dropPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
+ createPipeTask(
+ consensusGroupIdFromConfigNode,
+ pipeStaticMeta,
+ taskMetaFromConfigNode.getProgressIndex(),
+ taskMetaFromConfigNode.getRegionLeader());
+ // we keep the new created task's status consistent with the status
recorded in data node's
+ // pipe runtime meta. please note that the status recorded in data
node's pipe runtime meta
+ // is not reliable, but we will have a check later to make sure the
status is correct.
+ if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
+ startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
+ }
+ }
+ }
+
+ // 1.2 iterate over all consensus group ids on data node's pipe runtime
meta, decide if we need
+ // to drop any task. we do not need to create any new task here because we
have already done
+ // that in 1.1.
+ for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryOnDataNode :
+ consensusGroupIdToTaskMetaMapOnDataNode.entrySet()) {
+ final TConsensusGroupId consensusGroupIdOnDataNode =
entryOnDataNode.getKey();
+ final PipeTaskMeta taskMetaFromConfigNode =
+
consensusGroupIdToTaskMetaMapFromConfigNode.get(consensusGroupIdOnDataNode);
+ if (taskMetaFromConfigNode == null) {
+ dropPipeTask(consensusGroupIdOnDataNode, pipeStaticMeta);
+ }
+ }
+
+ // 2. handle pipe runtime meta status changes
+ final PipeStatus statusFromConfigNode =
runtimeMetaFromConfigNode.getStatus().get();
+ final PipeStatus statusOnDataNode =
runtimeMetaOnDataNode.getStatus().get();
+ if (statusFromConfigNode == statusOnDataNode) {
+ return;
+ }
+
+ switch (statusFromConfigNode) {
+ case RUNNING:
+ if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.STOPPED) {
+ startPipe(pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime());
+ } else {
+ throw new IllegalStateException(
+ String.format(
+ "Unknown pipe status %s for pipe %s",
+ statusOnDataNode, pipeStaticMeta.getPipeName()));
+ }
+ break;
+ case STOPPED:
+ if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.RUNNING) {
+ stopPipe(pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime());
+ } else {
+ throw new IllegalStateException(
+ String.format(
+ "Unknown pipe status %s for pipe %s",
+ statusOnDataNode, pipeStaticMeta.getPipeName()));
+ }
+ break;
+ case DROPPED:
+ // this should not happen, but we still handle it here
+ dropPipe(pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime());
+ break;
+ default:
+ throw new IllegalStateException(
+ String.format(
+ "Unknown pipe status %s for pipe %s",
+ statusFromConfigNode, pipeStaticMeta.getPipeName()));
+ }
+ }
+
+ public synchronized void dropAllPipeTasks() {
+ for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+ dropPipe(pipeMeta.getStaticMeta().getPipeName(),
pipeMeta.getStaticMeta().getCreationTime());
+ }
+ }
+
+ ////////////////////////// Manage by Pipe Name //////////////////////////
+
+ private void createPipe(PipeMeta pipeMeta) {
final String pipeName = pipeMeta.getStaticMeta().getPipeName();
final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
@@ -120,7 +296,7 @@ public class PipeTaskAgent {
pipeMetaKeeper.addPipeMeta(pipeName, pipeMeta);
}
- public synchronized void dropPipe(String pipeName, long creationTime) {
+ private void dropPipe(String pipeName, long creationTime) {
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (existedPipeMeta == null) {
@@ -162,7 +338,7 @@ public class PipeTaskAgent {
pipeMetaKeeper.removePipeMeta(pipeName);
}
- public synchronized void dropPipe(String pipeName) {
+ private void dropPipe(String pipeName) {
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (existedPipeMeta == null) {
@@ -192,7 +368,7 @@ public class PipeTaskAgent {
pipeMetaKeeper.removePipeMeta(pipeName);
}
- public synchronized void startPipe(String pipeName, long creationTime) {
+ private void startPipe(String pipeName, long creationTime) {
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (existedPipeMeta == null) {
@@ -256,7 +432,7 @@ public class PipeTaskAgent {
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
}
- public synchronized void stopPipe(String pipeName, long creationTime) {
+ private void stopPipe(String pipeName, long creationTime) {
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (existedPipeMeta == null) {
@@ -320,167 +496,7 @@ public class PipeTaskAgent {
existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
}
- // TODO: handle progress index
- public synchronized void handlePipeMetaChanges(List<PipeMeta>
pipeMetaListFromConfigNode) {
- // iterate through pipe meta list from config node, check if pipe meta
exists on data node
- // or has changed
- for (final PipeMeta metaFromConfigNode : pipeMetaListFromConfigNode) {
- final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
-
- final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
-
- // if pipe meta does not exist on data node, create a new pipe
- if (metaOnDataNode == null) {
- createPipe(metaFromConfigNode);
- if (metaFromConfigNode.getRuntimeMeta().getStatus().get() ==
PipeStatus.RUNNING) {
- startPipe(pipeName,
metaFromConfigNode.getStaticMeta().getCreationTime());
- }
- // if the status is STOPPED or DROPPED, do nothing
- continue;
- }
-
- // if pipe meta exists on data node, check if it has changed
- final PipeStaticMeta staticMetaOnDataNode =
metaOnDataNode.getStaticMeta();
- final PipeStaticMeta staticMetaFromConfigNode =
metaFromConfigNode.getStaticMeta();
-
- // first check if pipe static meta has changed, if so, drop the pipe and
create a new one
- if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
- dropPipe(pipeName);
- createPipe(metaFromConfigNode);
- if (metaFromConfigNode.getRuntimeMeta().getStatus().get() ==
PipeStatus.RUNNING) {
- startPipe(pipeName,
metaFromConfigNode.getStaticMeta().getCreationTime());
- }
- // if the status is STOPPED or DROPPED, do nothing
- continue;
- }
-
- // then check if pipe runtime meta has changed, if so, update the pipe
- final PipeRuntimeMeta runtimeMetaOnDataNode =
metaOnDataNode.getRuntimeMeta();
- final PipeRuntimeMeta runtimeMetaFromConfigNode =
metaFromConfigNode.getRuntimeMeta();
- handlePipeRuntimeMetaChanges(
- staticMetaFromConfigNode, runtimeMetaFromConfigNode,
runtimeMetaOnDataNode);
- }
-
- // check if there are pipes on data node that do not exist on config node,
if so, drop them
- final Set<String> pipeNamesFromConfigNode =
- pipeMetaListFromConfigNode.stream()
- .map(meta -> meta.getStaticMeta().getPipeName())
- .collect(Collectors.toSet());
- for (final PipeMeta metaOnDataNode : pipeMetaKeeper.getPipeMetaList()) {
- if
(!pipeNamesFromConfigNode.contains(metaOnDataNode.getStaticMeta().getPipeName()))
{
- dropPipe(metaOnDataNode.getStaticMeta().getPipeName());
- }
- }
- }
-
- private void handlePipeRuntimeMetaChanges(
- @NotNull PipeStaticMeta pipeStaticMeta,
- @NotNull PipeRuntimeMeta runtimeMetaFromConfigNode,
- @NotNull PipeRuntimeMeta runtimeMetaOnDataNode) {
- // 1. handle data region group leader changed first
- final Map<TConsensusGroupId, PipeTaskMeta>
consensusGroupIdToTaskMetaMapFromConfigNode =
- runtimeMetaFromConfigNode.getConsensusGroupIdToTaskMetaMap();
- final Map<TConsensusGroupId, PipeTaskMeta>
consensusGroupIdToTaskMetaMapOnDataNode =
- runtimeMetaOnDataNode.getConsensusGroupIdToTaskMetaMap();
-
- // 1.1 iterate over all consensus group ids in config node's pipe runtime
meta, decide if we
- // need to drop and create a new task for each consensus group id
- for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryFromConfigNode :
- consensusGroupIdToTaskMetaMapFromConfigNode.entrySet()) {
- final TConsensusGroupId consensusGroupIdFromConfigNode =
entryFromConfigNode.getKey();
-
- final PipeTaskMeta taskMetaFromConfigNode =
entryFromConfigNode.getValue();
- final PipeTaskMeta taskMetaOnDataNode =
-
consensusGroupIdToTaskMetaMapOnDataNode.get(consensusGroupIdFromConfigNode);
-
- // if task meta does not exist on data node, create a new task
- if (taskMetaOnDataNode == null) {
- createPipeTask(
- consensusGroupIdFromConfigNode,
- pipeStaticMeta,
- taskMetaFromConfigNode.getProgressIndex(),
- taskMetaFromConfigNode.getRegionLeader());
- // we keep the new created task's status consistent with the status
recorded in data node's
- // pipe runtime meta. please note that the status recorded in data
node's pipe runtime meta
- // is not reliable, but we will have a check later to make sure the
status is correct.
- if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
- startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
- }
- continue;
- }
-
- // if task meta exists on data node, check if it has changed
- final int regionLeaderFromConfigNode =
taskMetaFromConfigNode.getRegionLeader();
- final int regionLeaderOnDataNode = taskMetaOnDataNode.getRegionLeader();
-
- if (regionLeaderFromConfigNode != regionLeaderOnDataNode) {
- dropPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
- createPipeTask(
- consensusGroupIdFromConfigNode,
- pipeStaticMeta,
- taskMetaFromConfigNode.getProgressIndex(),
- taskMetaFromConfigNode.getRegionLeader());
- // we keep the new created task's status consistent with the status
recorded in data node's
- // pipe runtime meta. please note that the status recorded in data
node's pipe runtime meta
- // is not reliable, but we will have a check later to make sure the
status is correct.
- if (runtimeMetaOnDataNode.getStatus().get() == PipeStatus.RUNNING) {
- startPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
- }
- }
- }
-
- // 1.2 iterate over all consensus group ids on data node's pipe runtime
meta, decide if we need
- // to drop any task. we do not need to create any new task here because we
have already done
- // that in 1.1.
- for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> entryOnDataNode :
- consensusGroupIdToTaskMetaMapOnDataNode.entrySet()) {
- final TConsensusGroupId consensusGroupIdOnDataNode =
entryOnDataNode.getKey();
- final PipeTaskMeta taskMetaFromConfigNode =
-
consensusGroupIdToTaskMetaMapFromConfigNode.get(consensusGroupIdOnDataNode);
- if (taskMetaFromConfigNode == null) {
- dropPipeTask(consensusGroupIdOnDataNode, pipeStaticMeta);
- }
- }
-
- // 2. handle pipe runtime meta status changes
- final PipeStatus statusFromConfigNode =
runtimeMetaFromConfigNode.getStatus().get();
- final PipeStatus statusOnDataNode =
runtimeMetaOnDataNode.getStatus().get();
- if (statusFromConfigNode == statusOnDataNode) {
- return;
- }
-
- switch (statusFromConfigNode) {
- case RUNNING:
- if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.STOPPED) {
- startPipe(pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime());
- } else {
- throw new IllegalStateException(
- String.format(
- "Unknown pipe status %s for pipe %s",
- statusOnDataNode, pipeStaticMeta.getPipeName()));
- }
- break;
- case STOPPED:
- if (Objects.requireNonNull(statusOnDataNode) == PipeStatus.RUNNING) {
- stopPipe(pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime());
- } else {
- throw new IllegalStateException(
- String.format(
- "Unknown pipe status %s for pipe %s",
- statusOnDataNode, pipeStaticMeta.getPipeName()));
- }
- break;
- case DROPPED:
- // this should not happen, but we still handle it here
- dropPipe(pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime());
- break;
- default:
- throw new IllegalStateException(
- String.format(
- "Unknown pipe status %s for pipe %s",
- statusFromConfigNode, pipeStaticMeta.getPipeName()));
- }
- }
+ ///////////////////////// Manage by dataRegionGroupId
/////////////////////////
private void createPipeTask(
TConsensusGroupId dataRegionGroupId,
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
index 2952476dbbe..762561546b3 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -58,7 +58,7 @@ public abstract class PipeSubtaskExecutor {
/////////////////////// subtask management ///////////////////////
- public final void register(PipeSubtask subtask) {
+ public final synchronized void register(PipeSubtask subtask) {
if (registeredIdSubtaskMapper.containsKey(subtask.getTaskID())) {
LOGGER.warn("The subtask {} is already registered.",
subtask.getTaskID());
return;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 44a5690db8c..db28843d7c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -540,6 +540,8 @@ public class DataNode implements DataNodeMBean {
registerManager.register(RegionMigrateService.getInstance());
registerManager.register(CompactionTaskManager.getInstance());
+
+ registerManager.register(PipeAgent.runtime());
}
/** set up RPC and protocols after DataNode is available */
@@ -834,17 +836,17 @@ public class DataNode implements DataNodeMBean {
}
private void preparePipeResources() throws StartupException {
- PipeAgent.runtime().launch(resourcesInformationHolder);
+ PipeAgent.runtime().launchPipePluginAgent(resourcesInformationHolder);
}
private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {
- if (allPipeInformation != null && !allPipeInformation.isEmpty()) {
- List<PipePluginMeta> list = new ArrayList<>();
+ final List<PipePluginMeta> list = new ArrayList<>();
+ if (allPipeInformation != null) {
for (ByteBuffer pipeInformationByteBuffer : allPipeInformation) {
list.add(PipePluginMeta.deserialize(pipeInformationByteBuffer));
}
- resourcesInformationHolder.setPipePluginMetaList(list);
}
+ resourcesInformationHolder.setPipePluginMetaList(list);
}
private void initSchemaEngine() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 6e0f82681a6..86feb49b8db 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1180,7 +1180,11 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus setSystemStatus(String status) throws TException {
try {
-
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.parse(status));
+ final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
+ commonConfig.setNodeStatus(NodeStatus.parse(status));
+ if (commonConfig.getNodeStatus().equals(NodeStatus.Removing)) {
+ PipeAgent.runtime().stop();
+ }
} catch (Exception e) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
}