This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b754b03ea0 [IOTDB-5787] PipeTaskAgent: Pipe task management on data
nodes (#9782)
b754b03ea0 is described below
commit b754b03ea081fda17e0bda9d27d8b7c177374613
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon May 8 09:03:00 2023 +0800
[IOTDB-5787] PipeTaskAgent: Pipe task management on data nodes (#9782)
---
.../confignode/persistence/pipe/PipeTaskInfo.java | 28 ++-
.../pipe/task/AbstractOperatePipeProcedureV2.java | 7 +-
.../impl/pipe/task/CreatePipeProcedureV2.java | 6 +-
.../commons/pipe/task/meta/PipeStaticMeta.java | 20 +-
.../org/apache/iotdb/db/pipe/agent/PipeAgent.java | 2 +-
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 270 ++++++++++++++++++++-
6 files changed, 307 insertions(+), 26 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index c3975a817d..fa8c1ae9cd 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -81,11 +81,17 @@ public class PipeTaskInfo implements SnapshotProcessor {
return false;
}
- if (getPipeStatus(pipeName) == PipeStatus.RUNNING) {
+ final PipeStatus pipeStatus = getPipeStatus(pipeName);
+ if (pipeStatus == PipeStatus.RUNNING) {
LOGGER.info(
String.format("Failed to start pipe [%s], the pipe is already
running", pipeName));
return false;
}
+ if (pipeStatus == PipeStatus.DROPPED) {
+ LOGGER.info(
+ String.format("Failed to start pipe [%s], the pipe is already
dropped", pipeName));
+ return false;
+ }
return true;
}
@@ -96,21 +102,29 @@ public class PipeTaskInfo implements SnapshotProcessor {
return false;
}
- if (getPipeStatus(pipeName) == PipeStatus.STOPPED) {
+ final PipeStatus pipeStatus = getPipeStatus(pipeName);
+ if (pipeStatus == PipeStatus.STOPPED) {
LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already
stop", pipeName));
return false;
}
+ if (pipeStatus == PipeStatus.DROPPED) {
+ LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already
dropped", pipeName));
+ return false;
+ }
return true;
}
public boolean checkBeforeDropPipe(String pipeName) {
- if (isPipeExisted(pipeName)) {
- return true;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Check before drop pipe {}, pipe exists: {}.",
+ pipeName,
+ isPipeExisted(pipeName) ? "true" : "false");
}
-
- LOGGER.info(String.format("Failed to drop pipe [%s], the pipe does not
exist", pipeName));
- return false;
+ // no matter whether the pipe exists, we allow the drop operation executed
on all nodes to
+ // ensure the consistency
+ return true;
}
private boolean isPipeExisted(String pipeName) {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
index 0d9f98101f..7b2f5cac3a 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
@@ -129,8 +129,11 @@ abstract class AbstractOperatePipeProcedureV2 extends
AbstractNodeProcedure<Oper
throws IOException, InterruptedException, ProcedureException {
switch (state) {
case VALIDATE_TASK:
- rollbackFromValidateTask(env);
-
env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+ try {
+ rollbackFromValidateTask(env);
+ } finally {
+
env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+ }
break;
case CALCULATE_INFO_FOR_TASK:
rollbackFromCalculateInfoForTask(env);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 995b2a977d..d81a88fe53 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -103,9 +103,9 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
.getLoadManager()
.getRegionLeaderMap()
.forEach(
- (region, leader) -> {
- consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0,
leader));
- });
+ (region, leader) ->
+ // TODO: make index configurable
+ consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0,
leader)));
pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index 14185dd8a5..eab7408386 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -32,7 +32,7 @@ import java.util.Map;
public class PipeStaticMeta {
private String pipeName;
- private long createTime;
+ private long creationTime;
private PipeParameters collectorParameters;
private PipeParameters processorParameters;
@@ -42,12 +42,12 @@ public class PipeStaticMeta {
public PipeStaticMeta(
String pipeName,
- long createTime,
+ long creationTime,
Map<String, String> collectorAttributes,
Map<String, String> processorAttributes,
Map<String, String> connectorAttributes) {
this.pipeName = pipeName.toUpperCase();
- this.createTime = createTime;
+ this.creationTime = creationTime;
collectorParameters = new PipeParameters(collectorAttributes);
processorParameters = new PipeParameters(processorAttributes);
connectorParameters = new PipeParameters(connectorAttributes);
@@ -57,8 +57,8 @@ public class PipeStaticMeta {
return pipeName;
}
- public long getCreateTime() {
- return createTime;
+ public long getCreationTime() {
+ return creationTime;
}
public PipeParameters getCollectorParameters() {
@@ -82,7 +82,7 @@ public class PipeStaticMeta {
public void serialize(DataOutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(pipeName, outputStream);
- ReadWriteIOUtils.write(createTime, outputStream);
+ ReadWriteIOUtils.write(creationTime, outputStream);
outputStream.writeInt(collectorParameters.getAttribute().size());
for (Map.Entry<String, String> entry :
collectorParameters.getAttribute().entrySet()) {
@@ -110,7 +110,7 @@ public class PipeStaticMeta {
final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta();
pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(byteBuffer);
- pipeStaticMeta.createTime = ReadWriteIOUtils.readLong(byteBuffer);
+ pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(byteBuffer);
pipeStaticMeta.collectorParameters = new PipeParameters(new HashMap<>());
pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
@@ -151,7 +151,7 @@ public class PipeStaticMeta {
}
PipeStaticMeta that = (PipeStaticMeta) obj;
return pipeName.equals(that.pipeName)
- && createTime == that.createTime
+ && creationTime == that.creationTime
&& collectorParameters.equals(that.collectorParameters)
&& processorParameters.equals(that.processorParameters)
&& connectorParameters.equals(that.connectorParameters);
@@ -168,8 +168,8 @@ public class PipeStaticMeta {
+ "pipeName='"
+ pipeName
+ '\''
- + ", createTime="
- + createTime
+ + ", creationTime="
+ + creationTime
+ ", collectorParameters="
+ collectorParameters.getAttribute()
+ ", processorParameters="
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
index f490a171f1..501cbf0016 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.pipe.agent.plugin.PipePluginAgent;
import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent;
-/** PipeAgent is the entry point of the pipe module in DatNode. */
+/** PipeAgent is the entry point of the pipe module in DataNode. */
public class PipeAgent {
private final PipePluginAgent pipePluginAgent;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 9fffa86706..c0df1e1761 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -19,19 +19,283 @@
package org.apache.iotdb.db.pipe.agent.task;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * State transition diagram of a pipe task:
+ *
+ * <p><code>
+ * |----------------| |---------| --> start pipe -->
|---------| |---------|
+ * | initial status | --> create pipe --> | STOPPED | |
RUNNING | --> drop pipe --> | DROPPED |
+ * |----------------| |---------| <-- stop pipe <--
|---------| |---------|
+ * |
|
+ * | ----------------------> drop
pipe -----------------------> |
+ * </code>
+ *
+ * <p>Other transitions are not allowed, will be ignored when received in the
pipe task agent.
+ */
public class PipeTaskAgent {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTaskAgent.class);
+
private final PipeMetaKeeper pipeMetaKeeper;
public PipeTaskAgent() {
pipeMetaKeeper = new PipeMetaKeeper();
}
- // TODO: remove this method
- public PipeMeta getPipeMeta(String pipeName) {
- return pipeMetaKeeper.getPipeMeta(pipeName);
+ ////////////////////////// Pipe Task Management //////////////////////////
+
+ public void createPipe(PipeMeta pipeMeta) {
+ final String pipeName = pipeMeta.getStaticMeta().getPipeName();
+ final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
+
+ final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+ if (existedPipeMeta != null) {
+ if (existedPipeMeta.getStaticMeta().getCreationTime() == creationTime) {
+ switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+ case STOPPED:
+ case RUNNING:
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been created.
Current status = {}. Skip creating.",
+ pipeName,
+ creationTime,
+ existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ return;
+ case DROPPED:
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped, but
the pipe task meta has not been cleaned up. "
+ + "Current status = {}. Try dropping the pipe and
recreating it.",
+ pipeName,
+ creationTime,
+ existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ // break to drop the pipe and recreate it
+ break;
+ default:
+ throw new IllegalStateException(
+ "Unexpected status: " +
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ }
+ }
+
+ // drop the pipe if
+ // 1. the pipe with the same name but with different creation time has
been created before
+ // 2. the pipe with the same name and the same creation time has been
dropped before, but the
+ // pipe task meta has not been cleaned up
+ dropPipe(pipeName, existedPipeMeta.getStaticMeta().getCreationTime());
+ }
+
+ // build pipe task by consensus group
+ pipeMeta
+ .getRuntimeMeta()
+ .getConsensusGroupIdToTaskMetaMap()
+ .forEach(
+ ((consensusGroupId, pipeTaskMeta) -> {
+ createPipeTaskByConsensusGroup(
+ pipeName, creationTime, consensusGroupId, pipeTaskMeta);
+ }));
+ // add pipe meta to pipe meta keeper
+ // note that we do not need to set the status of pipe meta here, because
the status of pipe meta
+ // is already set to STOPPED when it is created
+ pipeMetaKeeper.addPipeMeta(pipeName, pipeMeta);
+ }
+
+ public void createPipeTaskByConsensusGroup(
+ String pipeName,
+ long creationTime,
+ TConsensusGroupId consensusGroupId,
+ PipeTaskMeta pipeTaskMeta) {}
+
+ public void dropPipe(String pipeName, long creationTime) {
+ final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+ if (existedPipeMeta == null) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped or has not
been created. Skip dropping.",
+ pipeName,
+ creationTime);
+ return;
+ }
+ if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has been created but does not match
the creation time ({}) in dropPipe request. Skip dropping.",
+ pipeName,
+ existedPipeMeta.getStaticMeta().getCreationTime(),
+ creationTime);
+ return;
+ }
+
+ // mark pipe meta as dropped first. this will help us detect if the pipe
meta has been dropped
+ // but the pipe task meta has not been cleaned up (in case of failure when
executing
+ // dropPipeTaskByConsensusGroup).
+ existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
+ // drop pipe task by consensus group
+ existedPipeMeta
+ .getRuntimeMeta()
+ .getConsensusGroupIdToTaskMetaMap()
+ .forEach(
+ ((consensusGroupId, pipeTaskMeta) -> {
+ dropPipeTaskByConsensusGroup(pipeName, creationTime,
consensusGroupId);
+ }));
+ // remove pipe meta from pipe meta keeper
+ pipeMetaKeeper.removePipeMeta(pipeName);
+ }
+
+ public void dropPipeTaskByConsensusGroup(
+ String pipeName, long creationTime, TConsensusGroupId consensusGroupId)
{}
+
+ public void dropPipe(String pipeName) {
+ final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+ if (existedPipeMeta == null) {
+ LOGGER.info(
+ "Pipe {} has already been dropped or has not been created. Skip
dropping.", pipeName);
+ return;
+ }
+
+ // mark pipe meta as dropped first. this will help us detect if the pipe
meta has been dropped
+ // but the pipe task meta has not been cleaned up (in case of failure when
executing
+ // dropPipeTaskByConsensusGroup).
+ existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
+ // drop pipe task by consensus group
+ existedPipeMeta
+ .getRuntimeMeta()
+ .getConsensusGroupIdToTaskMetaMap()
+ .forEach(
+ ((consensusGroupId, pipeTaskMeta) -> {
+ dropPipeTaskByConsensusGroup(pipeName, consensusGroupId);
+ }));
+ // remove pipe meta from pipe meta keeper
+ pipeMetaKeeper.removePipeMeta(pipeName);
+ }
+
+ public void dropPipeTaskByConsensusGroup(String pipeName, TConsensusGroupId
consensusGroupId) {}
+
+ public void startPipe(String pipeName, long creationTime) {
+ final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+ if (existedPipeMeta == null) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped or has not
been created. Skip starting.",
+ pipeName,
+ creationTime);
+ return;
+ }
+ if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has been created but does not match
the creation time ({}) in startPipe request. Skip starting.",
+ pipeName,
+ existedPipeMeta.getStaticMeta().getCreationTime(),
+ creationTime);
+ return;
+ }
+
+ switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+ case STOPPED:
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has been created. Current status =
{}. Starting.",
+ pipeName,
+ creationTime,
+ existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ break;
+ case RUNNING:
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been started. Current
status = {}. Skip starting.",
+ pipeName,
+ creationTime,
+ existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ return;
+ case DROPPED:
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped. Current
status = {}. Skip starting.",
+ pipeName,
+ creationTime,
+ existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ return;
+ default:
+ throw new IllegalStateException(
+ "Unexpected status: " +
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ }
+
+ // start pipe task by consensus group
+ existedPipeMeta
+ .getRuntimeMeta()
+ .getConsensusGroupIdToTaskMetaMap()
+ .forEach(
+ ((consensusGroupId, pipeTaskMeta) -> {
+ startPipeTaskByConsensusGroup(pipeName, creationTime,
consensusGroupId);
+ }));
+ // set pipe meta status to RUNNING
+ existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
+ }
+
+ public void startPipeTaskByConsensusGroup(
+ String pipeName, long creationTime, TConsensusGroupId consensusGroupId)
{}
+
+ public void stopPipe(String pipeName, long creationTime) {
+ final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+
+ if (existedPipeMeta == null) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped or has not
been created. Skip stopping.",
+ pipeName,
+ creationTime);
+ return;
+ }
+ if (existedPipeMeta.getStaticMeta().getCreationTime() != creationTime) {
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has been created but does not match
the creation time ({}) in stopPipe request. Skip stopping.",
+ pipeName,
+ existedPipeMeta.getStaticMeta().getCreationTime(),
+ creationTime);
+ return;
+ }
+
+ switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+ case STOPPED:
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been stopped. Current
status = {}. Skip stopping.",
+ pipeName,
+ creationTime,
+ existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ return;
+ case RUNNING:
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has been started. Current status =
{}. Stopping.",
+ pipeName,
+ creationTime,
+ existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ break;
+ case DROPPED:
+ LOGGER.info(
+ "Pipe {} (creation time = {}) has already been dropped. Current
status = {}. Skip stopping.",
+ pipeName,
+ creationTime,
+ existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ return;
+ default:
+ throw new IllegalStateException(
+ "Unexpected status: " +
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+ }
+
+ // stop pipe task by consensus group
+ existedPipeMeta
+ .getRuntimeMeta()
+ .getConsensusGroupIdToTaskMetaMap()
+ .forEach(
+ ((consensusGroupId, pipeTaskMeta) -> {
+ stopPipeTaskByConsensusGroup(pipeName, creationTime,
consensusGroupId);
+ }));
+ // set pipe meta status to STOPPED
+ existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
}
+
+ public void stopPipeTaskByConsensusGroup(
+ String pipeName, long creationTime, TConsensusGroupId consensusGroupId)
{}
}