This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5caeeb3e3b7 Pipe: automatically start pipe upon creation & allow to
start RUNNING pipe & allow to stop STOPPED pipe (#11690)
5caeeb3e3b7 is described below
commit 5caeeb3e3b7b19a9a06b54f25c9d02e4f39d4f7e
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Dec 12 14:46:18 2023 +0800
Pipe: automatically start pipe upon creation & allow to start RUNNING pipe
& allow to stop STOPPED pipe (#11690)
---
.../iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java | 22 ++++++++++-------
.../apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java | 2 +-
.../confignode/persistence/pipe/PipeTaskInfo.java | 28 ++++++++++++++--------
.../impl/pipe/task/CreatePipeProcedureV2.java | 2 ++
.../impl/pipe/task/StartPipeProcedureV2.java | 19 ++++++++++++++-
.../impl/pipe/task/StopPipeProcedureV2.java | 19 ++++++++++++++-
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 6 ++---
7 files changed, 74 insertions(+), 24 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
index 6c5838488ca..da1b7cbca30 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSwitchStatusIT.java
@@ -83,11 +83,11 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualIT {
List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
Assert.assertTrue(
- showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("STOPPED")));
+ showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
Assert.assertTrue(
- showPipeResult.stream().anyMatch((o) -> o.id.equals("p2") &&
o.state.equals("STOPPED")));
+ showPipeResult.stream().anyMatch((o) -> o.id.equals("p2") &&
o.state.equals("RUNNING")));
Assert.assertTrue(
- showPipeResult.stream().anyMatch((o) -> o.id.equals("p3") &&
o.state.equals("STOPPED")));
+ showPipeResult.stream().anyMatch((o) -> o.id.equals("p3") &&
o.state.equals("RUNNING")));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
@@ -148,9 +148,8 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualIT {
List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
Assert.assertTrue(
- showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("STOPPED")));
+ showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
- Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.stopPipe("p1").getCode());
status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
@@ -168,10 +167,17 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualIT {
showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
Assert.assertEquals(
- TSStatusCode.PIPE_ERROR.getStatusCode(),
client.startPipe("p1").getCode());
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
Assert.assertTrue(
- showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
+ showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("STOPPED")));
+ Assert.assertEquals(1, showPipeResult.stream().filter((o) ->
o.id.equals("p1")).count());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.stopPipe("p1").getCode());
+ showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
+ Assert.assertTrue(
+ showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("STOPPED")));
Assert.assertEquals(1, showPipeResult.stream().filter((o) ->
o.id.equals("p1")).count());
}
}
@@ -262,7 +268,7 @@ public class IoTDBPipeSwitchStatusIT extends
AbstractPipeDualIT {
Assert.assertEquals(TSStatusCode.PIPE_ERROR.getStatusCode(),
client.startPipe("*").getCode());
List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
Assert.assertTrue(
- showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("STOPPED")));
+ showPipeResult.stream().anyMatch((o) -> o.id.equals("p1") &&
o.state.equals("RUNNING")));
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
index 58e248dc251..fe0be803332 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeSyntaxIT.java
@@ -80,7 +80,7 @@ public class IoTDBPipeSyntaxIT extends AbstractPipeDualIT {
for (String pipeName : validPipeNames) {
Assert.assertTrue(
showPipeResult.stream()
- .anyMatch((o) -> o.id.equals(pipeName) &&
o.state.equals("STOPPED")));
+ .anyMatch((o) -> o.id.equals(pipeName) &&
o.state.equals("RUNNING")));
}
for (String pipeName : validPipeNames) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index f6e0003695b..a71f7deb2d3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -158,16 +158,18 @@ public class PipeTaskInfo implements SnapshotProcessor {
throw new PipeException(exceptionMessage);
}
- public void checkBeforeStartPipe(String pipeName) throws PipeException {
+ /** @return true if the pipe status is RUNNING before starting the pipe */
+ public boolean checkBeforeStartPipe(String pipeName) throws PipeException {
acquireReadLock();
try {
- checkBeforeStartPipeInternal(pipeName);
+ return checkBeforeStartPipeInternal(pipeName);
} finally {
releaseReadLock();
}
}
- private void checkBeforeStartPipeInternal(String pipeName) throws
PipeException {
+ /** @return true if the pipe status is RUNNING before starting the pipe */
+ private boolean checkBeforeStartPipeInternal(String pipeName) throws
PipeException {
if (!isPipeExisted(pipeName)) {
final String exceptionMessage =
String.format("Failed to start pipe %s, the pipe does not exist",
pipeName);
@@ -179,8 +181,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
if (pipeStatus == PipeStatus.RUNNING) {
final String exceptionMessage =
String.format("Failed to start pipe %s, the pipe is already
running", pipeName);
- LOGGER.info(exceptionMessage);
- throw new PipeException(exceptionMessage);
+ LOGGER.warn(exceptionMessage);
+ return true;
}
if (pipeStatus == PipeStatus.DROPPED) {
final String exceptionMessage =
@@ -188,18 +190,22 @@ public class PipeTaskInfo implements SnapshotProcessor {
LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}
+
+ return false;
}
- public void checkBeforeStopPipe(String pipeName) throws PipeException {
+ /** @return true if the pipe status is STOPPED before stopping the pipe */
+ public boolean checkBeforeStopPipe(String pipeName) throws PipeException {
acquireReadLock();
try {
- checkBeforeStopPipeInternal(pipeName);
+ return checkBeforeStopPipeInternal(pipeName);
} finally {
releaseReadLock();
}
}
- private void checkBeforeStopPipeInternal(String pipeName) throws
PipeException {
+ /** @return true if the pipe status is STOPPED before stopping the pipe */
+ private boolean checkBeforeStopPipeInternal(String pipeName) throws
PipeException {
if (!isPipeExisted(pipeName)) {
final String exceptionMessage =
String.format("Failed to stop pipe %s, the pipe does not exist",
pipeName);
@@ -211,8 +217,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
if (pipeStatus == PipeStatus.STOPPED) {
final String exceptionMessage =
String.format("Failed to stop pipe %s, the pipe is already stop",
pipeName);
- LOGGER.info(exceptionMessage);
- throw new PipeException(exceptionMessage);
+ LOGGER.warn(exceptionMessage);
+ return true;
}
if (pipeStatus == PipeStatus.DROPPED) {
final String exceptionMessage =
@@ -220,6 +226,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}
+
+ return false;
}
public void checkBeforeDropPipe(String pipeName) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 9eac9be52d3..2e3e9b568b7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.schema.SchemaConstant;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
@@ -120,6 +121,7 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
});
pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
+ pipeRuntimeMeta.getStatus().set(PipeStatus.RUNNING);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index 5b43ac83d8d..d07634ee8c1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -44,6 +44,12 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
private String pipeName;
+ // This variable is used to record whether the pipe status is RUNNING and to
determine whether to
+ // skip this procedure.
+ //
+ // Pure in-memory object, not involved in snapshot serialization and
deserialization.
+ private boolean canSkipSubsequentStages;
+
public StartPipeProcedureV2() {
super();
}
@@ -51,6 +57,7 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
public StartPipeProcedureV2(String pipeName) throws PipeException {
super();
this.pipeName = pipeName;
+ this.canSkipSubsequentStages = false;
}
@Override
@@ -62,7 +69,7 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws
PipeException {
LOGGER.info("StartPipeProcedureV2: executeFromValidateTask({})", pipeName);
- pipeTaskInfo.get().checkBeforeStartPipe(pipeName);
+ canSkipSubsequentStages =
pipeTaskInfo.get().checkBeforeStartPipe(pipeName);
}
@Override
@@ -76,6 +83,11 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
throws PipeException {
LOGGER.info("StartPipeProcedureV2:
executeFromWriteConfigNodeConsensus({})", pipeName);
+ if (canSkipSubsequentStages) {
+ LOGGER.warn("Pipe status is RUNNING, skip
executeFromWriteConfigNodeConsensus({})", pipeName);
+ return;
+ }
+
TSStatus response;
try {
response =
@@ -96,6 +108,11 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
+ if (canSkipSubsequentStages) {
+ LOGGER.warn("Pipe status is RUNNING, skip
executeFromOperateOnDataNodes({})", pipeName);
+ return;
+ }
+
String exceptionMessage =
parsePushPipeMetaExceptionForPipe(pipeName,
pushSinglePipeMetaToDataNodes(pipeName, env));
if (!exceptionMessage.isEmpty()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index 3ac00496a00..7e8cef01877 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -44,6 +44,12 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
private String pipeName;
+ // This variable is used to record whether the pipe status is STOPPED and to
determine whether to
+ // skip this procedure.
+ //
+ // Pure in-memory object, not involved in snapshot serialization and
deserialization.
+ private boolean canSkipSubsequentStages;
+
public StopPipeProcedureV2() {
super();
}
@@ -51,6 +57,7 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
public StopPipeProcedureV2(String pipeName) throws PipeException {
super();
this.pipeName = pipeName;
+ this.canSkipSubsequentStages = false;
}
@Override
@@ -62,7 +69,7 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws
PipeException {
LOGGER.info("StopPipeProcedureV2: executeFromValidateTask({})", pipeName);
- pipeTaskInfo.get().checkBeforeStopPipe(pipeName);
+ canSkipSubsequentStages = pipeTaskInfo.get().checkBeforeStopPipe(pipeName);
}
@Override
@@ -76,6 +83,11 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
throws PipeException {
LOGGER.info("StopPipeProcedureV2:
executeFromWriteConfigNodeConsensus({})", pipeName);
+ if (canSkipSubsequentStages) {
+ LOGGER.warn("Pipe status is STOPPED, skip
executeFromWriteConfigNodeConsensus({})", pipeName);
+ return;
+ }
+
TSStatus response;
try {
response =
@@ -96,6 +108,11 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
LOGGER.info("StopPipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
+ if (canSkipSubsequentStages) {
+ LOGGER.warn("Pipe status is STOPPED, skip
executeFromOperateOnDataNodes({})", pipeName);
+ return;
+ }
+
String exceptionMessage =
parsePushPipeMetaExceptionForPipe(pipeName,
pushSinglePipeMetaToDataNodes(pipeName, env));
if (!exceptionMessage.isEmpty()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 1ab5eb7cc81..c1f6170e85a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -66,9 +66,9 @@ import java.util.stream.Collectors;
* State transition diagram of a pipe task:
*
* <p><code>
- * |----------------| |---------| --> start pipe -->
|---------| |---------|
- * | initial status | --> create pipe --> | STOPPED | |
RUNNING | --> drop pipe --> | DROPPED |
- * |----------------| |---------| <-- stop pipe <--
|---------| |---------|
+ * |----------------| |---------| --> stop pipe -->
|---------| |---------|
+ * | initial status | --> create pipe --> | RUNNING | |
STOPPED | --> drop pipe --> | DROPPED |
+ * |----------------| |---------| <-- start pipe <--
|---------| |---------|
* |
|
* | ----------------------> drop
pipe -----------------------> |
* </code>