This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ef6dc0a8e14 Pipe: enhance the idempotence of start / stop pipe
(#11713)
ef6dc0a8e14 is described below
commit ef6dc0a8e149cfaee705bc7795ada75f8c92025c
Author: V_Galaxy <[email protected]>
AuthorDate: Sat Dec 16 16:49:07 2023 +0800
Pipe: enhance the idempotence of start / stop pipe (#11713)
As title. This commit is a follow-up to #11690, from now onwards, no-op
start pipe and stop pipe will directly skip the entire procedure execution,
avoiding the impact of rollback and other exceptional situations. At the same
time, the procedure result is set to facilitate differentiation on CN side for
identifying no-op start pipe and stop pipe.
It is important to note that when starting a RUNNING pipe or stopping a
STOPPED pipe, we need to consider the `isStoppedByRuntimeException` status of
the pipe. Otherwise, certain procedures might be mistakenly skipped.
---
.../iotdb/confignode/manager/ProcedureManager.java | 17 ++++---
.../manager/pipe/task/PipeTaskCoordinator.java | 10 +----
.../confignode/persistence/pipe/PipeTaskInfo.java | 52 +++++++++++-----------
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 18 +++++++-
.../runtime/PipeHandleLeaderChangeProcedure.java | 3 +-
.../runtime/PipeHandleMetaChangeProcedure.java | 3 +-
.../impl/pipe/runtime/PipeMetaSyncProcedure.java | 3 +-
.../impl/pipe/task/CreatePipeProcedureV2.java | 4 +-
.../impl/pipe/task/DropPipeProcedureV2.java | 4 +-
.../impl/pipe/task/StartPipeProcedureV2.java | 24 +++-------
.../impl/pipe/task/StopPipeProcedureV2.java | 24 +++-------
11 files changed, 78 insertions(+), 84 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index af3adc40c99..d616b109660 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -105,6 +105,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -753,7 +754,7 @@ public class ProcedureManager {
boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId),
statusList);
if (isSucceed) {
- return RpcUtils.SUCCESS_STATUS;
+ return statusList.get(0);
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(statusList.get(0).getMessage());
@@ -770,7 +771,7 @@ public class ProcedureManager {
boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId),
statusList);
if (isSucceed) {
- return RpcUtils.SUCCESS_STATUS;
+ return statusList.get(0);
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(statusList.get(0).getMessage());
@@ -787,7 +788,7 @@ public class ProcedureManager {
boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId),
statusList);
if (isSucceed) {
- return RpcUtils.SUCCESS_STATUS;
+ return statusList.get(0);
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(statusList.get(0).getMessage());
@@ -804,7 +805,7 @@ public class ProcedureManager {
boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId),
statusList);
if (isSucceed) {
- return RpcUtils.SUCCESS_STATUS;
+ return statusList.get(0);
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(statusList.get(0).getMessage());
@@ -924,7 +925,13 @@ public class ProcedureManager {
continue;
}
if (finishedProcedure.isSuccess()) {
- statusList.add(StatusUtils.OK);
+ if (Objects.nonNull(finishedProcedure.getResult())) {
+ statusList.add(
+ RpcUtils.getStatus(
+ TSStatusCode.SUCCESS_STATUS,
Arrays.toString(finishedProcedure.getResult())));
+ } else {
+ statusList.add(StatusUtils.OK);
+ }
} else {
if (finishedProcedure.getException().getCause() instanceof
IoTDBException) {
IoTDBException e = (IoTDBException)
finishedProcedure.getException().getCause();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
index f677fecb64d..e54710c167b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
@@ -120,15 +120,9 @@ public class PipeTaskCoordinator {
public TSStatus stopPipe(String pipeName) {
final boolean isStoppedByRuntimeException =
pipeTaskInfo.isStoppedByRuntimeException(pipeName);
final TSStatus status =
configManager.getProcedureManager().stopPipe(pipeName);
- if (status == RpcUtils.SUCCESS_STATUS) {
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
if (isStoppedByRuntimeException) {
- // Under normal circumstances, this branch is not executed because when
- // `isStoppedByRuntimeException` is true, the pipe status is most
likely to be STOPPED,
- // unless this method is called between the execution of
`autoRestartWithLock` and
- // `handleSuccessfulRestartWithLock` in `PipeMetaSyncer` (in this
case, the pipe status is
- // RUNNING and `isStoppedByRuntimeException` is true).
-
- // Even if return status is success, it doesn't imply the success of
the
+ // Even if the return status is success, it doesn't imply the success
of the
// `executeFromOperateOnDataNodes` phase of stopping pipe. However, we
still need to set
// `isStoppedByRuntimeException` to false to avoid auto-restart.
Meanwhile,
// `isStoppedByRuntimeException` does not need to be synchronized with
DNs.
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index a71f7deb2d3..aa9a985973e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -158,18 +158,16 @@ public class PipeTaskInfo implements SnapshotProcessor {
throw new PipeException(exceptionMessage);
}
- /** @return true if the pipe status is RUNNING before starting the pipe */
- public boolean checkBeforeStartPipe(String pipeName) throws PipeException {
+ public void checkBeforeStartPipe(String pipeName) throws PipeException {
acquireReadLock();
try {
- return checkBeforeStartPipeInternal(pipeName);
+ checkBeforeStartPipeInternal(pipeName);
} finally {
releaseReadLock();
}
}
- /** @return true if the pipe status is RUNNING before starting the pipe */
- private boolean checkBeforeStartPipeInternal(String pipeName) throws
PipeException {
+ private void checkBeforeStartPipeInternal(String pipeName) throws
PipeException {
if (!isPipeExisted(pipeName)) {
final String exceptionMessage =
String.format("Failed to start pipe %s, the pipe does not exist",
pipeName);
@@ -178,34 +176,24 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
final PipeStatus pipeStatus = getPipeStatus(pipeName);
- if (pipeStatus == PipeStatus.RUNNING) {
- final String exceptionMessage =
- String.format("Failed to start pipe %s, the pipe is already
running", pipeName);
- LOGGER.warn(exceptionMessage);
- return true;
- }
if (pipeStatus == PipeStatus.DROPPED) {
final String exceptionMessage =
String.format("Failed to start pipe %s, the pipe is already
dropped", pipeName);
LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}
-
- return false;
}
- /** @return true if the pipe status is STOPPED before stopping the pipe */
- public boolean checkBeforeStopPipe(String pipeName) throws PipeException {
+ public void checkBeforeStopPipe(String pipeName) throws PipeException {
acquireReadLock();
try {
- return checkBeforeStopPipeInternal(pipeName);
+ checkBeforeStopPipeInternal(pipeName);
} finally {
releaseReadLock();
}
}
- /** @return true if the pipe status is STOPPED before stopping the pipe */
- private boolean checkBeforeStopPipeInternal(String pipeName) throws
PipeException {
+ private void checkBeforeStopPipeInternal(String pipeName) throws
PipeException {
if (!isPipeExisted(pipeName)) {
final String exceptionMessage =
String.format("Failed to stop pipe %s, the pipe does not exist",
pipeName);
@@ -214,20 +202,12 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
final PipeStatus pipeStatus = getPipeStatus(pipeName);
- if (pipeStatus == PipeStatus.STOPPED) {
- final String exceptionMessage =
- String.format("Failed to stop pipe %s, the pipe is already stop",
pipeName);
- LOGGER.warn(exceptionMessage);
- return true;
- }
if (pipeStatus == PipeStatus.DROPPED) {
final String exceptionMessage =
String.format("Failed to stop pipe %s, the pipe is already dropped",
pipeName);
LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}
-
- return false;
}
public void checkBeforeDropPipe(String pipeName) {
@@ -269,6 +249,26 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
+ public boolean isPipeRunning(String pipeName) {
+ acquireReadLock();
+ try {
+ return pipeMetaKeeper.containsPipeMeta(pipeName)
+ && PipeStatus.RUNNING.equals(getPipeStatus(pipeName));
+ } finally {
+ releaseReadLock();
+ }
+ }
+
+ public boolean isPipeStopped(String pipeName) {
+ acquireReadLock();
+ try {
+ return pipeMetaKeeper.containsPipeMeta(pipeName)
+ && PipeStatus.STOPPED.equals(getPipeStatus(pipeName));
+ } finally {
+ releaseReadLock();
+ }
+ }
+
/////////////////////////////// Pipe Task Management
///////////////////////////////
public TSStatus createPipe(CreatePipePlanV2 plan) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 6fb364c6466..17ad3a4f4fc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -71,6 +72,9 @@ public abstract class AbstractOperatePipeProcedureV2
// putting it here is just for convenience
protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
+ private static final String SKIP_PIPE_PROCEDURE_MESSAGE =
+ "Try to start a RUNNING pipe or stop a STOPPED pipe, do nothing.";
+
@Override
protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv
configNodeProcedureEnv) {
LOGGER.info("ProcedureId {} try to acquire pipe lock.", getProcId());
@@ -162,9 +166,12 @@ public abstract class AbstractOperatePipeProcedureV2
/**
* Execute at state {@link OperatePipeTaskState#VALIDATE_TASK}.
*
+ * @return true if this procedure can skip subsequent stages (start RUNNING
pipe or stop STOPPED
+ * pipe without runtime exception)
* @throws PipeException if validation for pipe parameters failed
*/
- protected abstract void executeFromValidateTask(ConfigNodeProcedureEnv env)
throws PipeException;
+ protected abstract boolean executeFromValidateTask(ConfigNodeProcedureEnv
env)
+ throws PipeException;
/** Execute at state {@link OperatePipeTaskState#CALCULATE_INFO_FOR_TASK}. */
protected abstract void
executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env);
@@ -199,7 +206,14 @@ public abstract class AbstractOperatePipeProcedureV2
try {
switch (state) {
case VALIDATE_TASK:
- executeFromValidateTask(env);
+ if (executeFromValidateTask(env)) {
+ LOGGER.warn("ProcedureId {}: {}", getProcId(),
SKIP_PIPE_PROCEDURE_MESSAGE);
+ // On client side, the message returned after the successful
execution of the pipe
+ // command corresponding to this procedure is "Msg: The statement
is executed
+ // successfully."
+
this.setResult(SKIP_PIPE_PROCEDURE_MESSAGE.getBytes(StandardCharsets.UTF_8));
+ return Flow.NO_MORE_STATE;
+ }
setNextState(OperatePipeTaskState.CALCULATE_INFO_FOR_TASK);
break;
case CALCULATE_INFO_FOR_TASK:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 60626062808..a93d8fef580 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -67,10 +67,11 @@ public class PipeHandleLeaderChangeProcedure extends
AbstractOperatePipeProcedur
}
@Override
- protected void executeFromValidateTask(ConfigNodeProcedureEnv env) {
+ protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeHandleLeaderChangeProcedure: executeFromValidateTask");
// Nothing needs to be checked
+ return false;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 2e59aa287ca..9ab9c51d715 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -65,10 +65,11 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
}
@Override
- protected void executeFromValidateTask(ConfigNodeProcedureEnv env) {
+ protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeHandleMetaChangeProcedure: executeFromValidateTask");
// Do nothing
+ return false;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 4962c868eff..d1a68199c1d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -47,10 +47,11 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
}
@Override
- protected void executeFromValidateTask(ConfigNodeProcedureEnv env) {
+ protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeMetaSyncProcedure: executeFromValidateTask");
// Do nothing
+ return false;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 2e3e9b568b7..cafc91e77ee 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -75,7 +75,7 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
@Override
- protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws
PipeException {
+ protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws
PipeException {
LOGGER.info(
"CreatePipeProcedureV2: executeFromValidateTask({})",
createPipeRequest.getPipeName());
@@ -85,6 +85,8 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
.getPipePluginInfo()
.checkBeforeCreatePipe(createPipeRequest);
pipeTaskInfo.get().checkBeforeCreatePipe(createPipeRequest);
+
+ return false;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index 8958597b2a4..c2323fd76ec 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -58,10 +58,12 @@ public class DropPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
@Override
- protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws
PipeException {
+ protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws
PipeException {
LOGGER.info("DropPipeProcedureV2: executeFromValidateTask({})", pipeName);
pipeTaskInfo.get().checkBeforeDropPipe(pipeName);
+
+ return false;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index d07634ee8c1..4b5e6e740c5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -44,12 +44,6 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
private String pipeName;
- // This variable is used to record whether the pipe status is RUNNING and to
determine whether to
- // skip this procedure.
- //
- // Pure in-memory object, not involved in snapshot serialization and
deserialization.
- private boolean canSkipSubsequentStages;
-
public StartPipeProcedureV2() {
super();
}
@@ -57,7 +51,6 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
public StartPipeProcedureV2(String pipeName) throws PipeException {
super();
this.pipeName = pipeName;
- this.canSkipSubsequentStages = false;
}
@Override
@@ -66,10 +59,13 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
@Override
- protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws
PipeException {
+ protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws
PipeException {
LOGGER.info("StartPipeProcedureV2: executeFromValidateTask({})", pipeName);
- canSkipSubsequentStages =
pipeTaskInfo.get().checkBeforeStartPipe(pipeName);
+ pipeTaskInfo.get().checkBeforeStartPipe(pipeName);
+
+ return pipeTaskInfo.get().isPipeRunning(pipeName)
+ && !pipeTaskInfo.get().isStoppedByRuntimeException(pipeName);
}
@Override
@@ -83,11 +79,6 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
throws PipeException {
LOGGER.info("StartPipeProcedureV2:
executeFromWriteConfigNodeConsensus({})", pipeName);
- if (canSkipSubsequentStages) {
- LOGGER.warn("Pipe status is RUNNING, skip
executeFromWriteConfigNodeConsensus({})", pipeName);
- return;
- }
-
TSStatus response;
try {
response =
@@ -108,11 +99,6 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
- if (canSkipSubsequentStages) {
- LOGGER.warn("Pipe status is RUNNING, skip
executeFromOperateOnDataNodes({})", pipeName);
- return;
- }
-
String exceptionMessage =
parsePushPipeMetaExceptionForPipe(pipeName,
pushSinglePipeMetaToDataNodes(pipeName, env));
if (!exceptionMessage.isEmpty()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index 7e8cef01877..2a88b4ffe73 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -44,12 +44,6 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
private String pipeName;
- // This variable is used to record whether the pipe status is STOPPED and to
determine whether to
- // skip this procedure.
- //
- // Pure in-memory object, not involved in snapshot serialization and
deserialization.
- private boolean canSkipSubsequentStages;
-
public StopPipeProcedureV2() {
super();
}
@@ -57,7 +51,6 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
public StopPipeProcedureV2(String pipeName) throws PipeException {
super();
this.pipeName = pipeName;
- this.canSkipSubsequentStages = false;
}
@Override
@@ -66,10 +59,13 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
@Override
- protected void executeFromValidateTask(ConfigNodeProcedureEnv env) throws
PipeException {
+ protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws
PipeException {
LOGGER.info("StopPipeProcedureV2: executeFromValidateTask({})", pipeName);
- canSkipSubsequentStages = pipeTaskInfo.get().checkBeforeStopPipe(pipeName);
+ pipeTaskInfo.get().checkBeforeStopPipe(pipeName);
+
+ return pipeTaskInfo.get().isPipeStopped(pipeName)
+ && !pipeTaskInfo.get().isStoppedByRuntimeException(pipeName);
}
@Override
@@ -83,11 +79,6 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
throws PipeException {
LOGGER.info("StopPipeProcedureV2:
executeFromWriteConfigNodeConsensus({})", pipeName);
- if (canSkipSubsequentStages) {
- LOGGER.warn("Pipe status is STOPPED, skip
executeFromWriteConfigNodeConsensus({})", pipeName);
- return;
- }
-
TSStatus response;
try {
response =
@@ -108,11 +99,6 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
LOGGER.info("StopPipeProcedureV2: executeFromOperateOnDataNodes({})",
pipeName);
- if (canSkipSubsequentStages) {
- LOGGER.warn("Pipe status is STOPPED, skip
executeFromOperateOnDataNodes({})", pipeName);
- return;
- }
-
String exceptionMessage =
parsePushPipeMetaExceptionForPipe(pipeName,
pushSinglePipeMetaToDataNodes(pipeName, env));
if (!exceptionMessage.isEmpty()) {