This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch compute-resource-balance-1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 076aed3caac45187dbc8a1231d0e4b565680387a
Author: 马子坤 <[email protected]>
AuthorDate: Mon Jul 17 11:54:51 2023 +0800

    [IOTDB-6064] Pipe: Fix deadlock in rolling back procedures concurrently 
(#10537)
---
 .../manager/pipe/task/PipeTaskCoordinator.java     |  28 +++---
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  | 106 ++++++++++-----------
 2 files changed, 67 insertions(+), 67 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
index 886a728916b..e79509c0a9c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
@@ -116,7 +116,7 @@ public class PipeTaskCoordinator {
     final TSStatus status = 
configManager.getProcedureManager().stopPipe(pipeName);
     if (status == RpcUtils.SUCCESS_STATUS && isStoppedByRuntimeException) {
       LOGGER.info(
-          "Pipe {} has stopped successfully manually, stop its auto restart 
process.", pipeName);
+        "Pipe {} has stopped successfully manually, stop its auto restart 
process.", pipeName);
       pipeTaskInfo.setIsStoppedByRuntimeExceptionToFalse(pipeName);
       configManager.getProcedureManager().pipeHandleMetaChange(true, true);
     }
@@ -131,20 +131,20 @@ public class PipeTaskCoordinator {
       LOGGER.warn("Failed to drop pipe {}. Result status: {}.", pipeName, 
status);
     }
     return isPipeExistedBeforeDrop
-        ? status
-        : RpcUtils.getStatus(
-            TSStatusCode.PIPE_NOT_EXIST_ERROR,
-            String.format(
-                "Failed to drop pipe %s. Failures: %s does not exist.", 
pipeName, pipeName));
+      ? status
+      : RpcUtils.getStatus(
+      TSStatusCode.PIPE_NOT_EXIST_ERROR,
+      String.format(
+        "Failed to drop pipe %s. Failures: %s does not exist.", pipeName, 
pipeName));
   }
 
   public TShowPipeResp showPipes(TShowPipeReq req) {
     lock();
     try {
       return ((PipeTableResp)
-              configManager.getConsensusManager().read(new 
ShowPipePlanV2()).getDataset())
-          .filter(req.whereClause, req.pipeName)
-          .convertToTShowPipeResp();
+        configManager.getConsensusManager().read(new 
ShowPipePlanV2()).getDataset())
+        .filter(req.whereClause, req.pipeName)
+        .convertToTShowPipeResp();
     } finally {
       unlock();
     }
@@ -154,13 +154,13 @@ public class PipeTaskCoordinator {
     lock();
     try {
       return ((PipeTableResp)
-              configManager.getConsensusManager().read(new 
ShowPipePlanV2()).getDataset())
-          .convertToTGetAllPipeInfoResp();
+        configManager.getConsensusManager().read(new 
ShowPipePlanV2()).getDataset())
+        .convertToTGetAllPipeInfoResp();
     } catch (IOException e) {
       LOGGER.warn("Failed to get all pipe info.", e);
       return new TGetAllPipeInfoResp(
-          new 
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()),
-          Collections.emptyList());
+        new 
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()),
+        Collections.emptyList());
     } finally {
       unlock();
     }
@@ -174,4 +174,4 @@ public class PipeTaskCoordinator {
       unlock();
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index c0ef6d4bfc4..fe7a7a55b61 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -55,10 +55,10 @@ import java.util.concurrent.atomic.AtomicReference;
  * executed in sequence and node procedures can be locked when a pipe task 
procedure is running.
  */
 public abstract class AbstractOperatePipeProcedureV2
-    extends AbstractNodeProcedure<OperatePipeTaskState> {
+  extends AbstractNodeProcedure<OperatePipeTaskState> {
 
   private static final Logger LOGGER =
-      LoggerFactory.getLogger(AbstractOperatePipeProcedureV2.class);
+    LoggerFactory.getLogger(AbstractOperatePipeProcedureV2.class);
 
   private static final int RETRY_THRESHOLD = 1;
 
@@ -75,11 +75,11 @@ public abstract class AbstractOperatePipeProcedureV2
     try {
       if (configNodeProcedureEnv.getNodeLock().tryLock(this)) {
         pipeTaskInfo =
-            configNodeProcedureEnv
-                .getConfigManager()
-                .getPipeManager()
-                .getPipeTaskCoordinator()
-                .lock();
+          configNodeProcedureEnv
+            .getConfigManager()
+            .getPipeManager()
+            .getPipeTaskCoordinator()
+            .lock();
         LOGGER.info("ProcedureId {} acquire lock.", getProcId());
         return ProcedureLockState.LOCK_ACQUIRED;
       }
@@ -98,15 +98,15 @@ public abstract class AbstractOperatePipeProcedureV2
       LOGGER.info("ProcedureId {} release lock.", getProcId());
       if (pipeTaskInfo != null) {
         configNodeProcedureEnv
-            .getConfigManager()
-            .getPipeManager()
-            .getPipeTaskCoordinator()
-            .unlock();
+          .getConfigManager()
+          .getPipeManager()
+          .getPipeTaskCoordinator()
+          .unlock();
       }
       if (configNodeProcedureEnv.getNodeLock().releaseLock(this)) {
         configNodeProcedureEnv
-            .getNodeLock()
-            .wakeWaitingProcedures(configNodeProcedureEnv.getScheduler());
+          .getNodeLock()
+          .wakeWaitingProcedures(configNodeProcedureEnv.getScheduler());
       }
     } finally {
       configNodeProcedureEnv.getSchedulerLock().unlock();
@@ -131,7 +131,7 @@ public abstract class AbstractOperatePipeProcedureV2
    * @throws PipeException if configNode consensus write failed
    */
   protected abstract void 
executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
-      throws PipeException;
+    throws PipeException;
 
   /**
    * Execute at state {@link OperatePipeTaskState#OPERATE_ON_DATA_NODES}.
@@ -140,11 +140,11 @@ public abstract class AbstractOperatePipeProcedureV2
    * @throws IOException Exception when Serializing to byte buffer
    */
   protected abstract void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv 
env)
-      throws PipeException, IOException;
+    throws PipeException, IOException;
 
   @Override
   protected Flow executeFromState(ConfigNodeProcedureEnv env, 
OperatePipeTaskState state)
-      throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
+    throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
     try {
       switch (state) {
         case VALIDATE_TASK:
@@ -164,30 +164,30 @@ public abstract class AbstractOperatePipeProcedureV2
           return Flow.NO_MORE_STATE;
         default:
           throw new UnsupportedOperationException(
-              String.format("Unknown state during executing 
operatePipeProcedure, %s", state));
+            String.format("Unknown state during executing 
operatePipeProcedure, %s", state));
       }
     } catch (Exception e) {
       // Retry before rollback
       if (getCycles() < RETRY_THRESHOLD) {
         LOGGER.warn(
-            "Encountered error when trying to {} at state [{}], retry [{}/{}]",
-            getOperation(),
-            state,
-            getCycles() + 1,
-            RETRY_THRESHOLD,
-            e);
+          "Encountered error when trying to {} at state [{}], retry [{}/{}]",
+          getOperation(),
+          state,
+          getCycles() + 1,
+          RETRY_THRESHOLD,
+          e);
         // Wait 3s for next retry
         TimeUnit.MILLISECONDS.sleep(3000L);
       } else {
         LOGGER.warn(
-            "All {} retries failed when trying to {} at state [{}], will 
rollback...",
-            RETRY_THRESHOLD,
-            getOperation(),
-            state,
-            e);
+          "All {} retries failed when trying to {} at state [{}], will 
rollback...",
+          RETRY_THRESHOLD,
+          getOperation(),
+          state,
+          e);
         setFailure(
-            new ProcedureException(
-                String.format("Fail to %s because %s", getOperation().name(), 
e.getMessage())));
+          new ProcedureException(
+            String.format("Fail to %s because %s", getOperation().name(), 
e.getMessage())));
       }
     }
     return Flow.HAS_MORE_STATE;
@@ -200,7 +200,7 @@ public abstract class AbstractOperatePipeProcedureV2
 
   @Override
   protected void rollbackState(ConfigNodeProcedureEnv env, 
OperatePipeTaskState state)
-      throws IOException, InterruptedException, ProcedureException {
+    throws IOException, InterruptedException, ProcedureException {
     switch (state) {
       case VALIDATE_TASK:
         rollbackFromValidateTask(env);
@@ -237,7 +237,7 @@ public abstract class AbstractOperatePipeProcedureV2
   protected abstract void 
rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env);
 
   protected abstract void 
rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
-      throws IOException;
+    throws IOException;
 
   @Override
   protected OperatePipeTaskState getState(int stateId) {
@@ -263,7 +263,7 @@ public abstract class AbstractOperatePipeProcedureV2
    * @throws IOException Exception when Serializing to byte buffer
    */
   protected Map<Integer, TPushPipeMetaResp> 
pushPipeMetaToDataNodes(ConfigNodeProcedureEnv env)
-      throws IOException {
+    throws IOException {
     final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
     for (PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
       pipeMetaBinaryList.add(pipeMeta.serialize());
@@ -280,7 +280,7 @@ public abstract class AbstractOperatePipeProcedureV2
    * @return Error messages for the given pipe after pushing pipe meta
    */
   protected String parsePushPipeMetaExceptionForPipe(
-      String pipeName, Map<Integer, TPushPipeMetaResp> respMap) {
+    String pipeName, Map<Integer, TPushPipeMetaResp> respMap) {
     final StringBuilder exceptionMessageBuilder = new StringBuilder();
 
     for (Map.Entry<Integer, TPushPipeMetaResp> respEntry : respMap.entrySet()) 
{
@@ -290,30 +290,30 @@ public abstract class AbstractOperatePipeProcedureV2
       if (resp.getStatus().getCode() == 
TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()) {
         if (!resp.isSetExceptionMessages()) {
           exceptionMessageBuilder.append(
-              String.format(
-                  "DataNodeId: %s, Message: Internal error while processing 
pushPipeMeta on dataNodes.",
-                  dataNodeId));
+            String.format(
+              "DataNodeId: %s, Message: Internal error while processing 
pushPipeMeta on dataNodes.",
+              dataNodeId));
           continue;
         }
 
         AtomicBoolean hasException = new AtomicBoolean(false);
 
         resp.getExceptionMessages()
-            .forEach(
-                message -> {
-                  // Ignore the timeStamp for simplicity
-                  if (pipeName == null) {
-                    hasException.set(true);
-                    exceptionMessageBuilder.append(
-                        String.format(
-                            "PipeName: %s, Message: %s",
-                            message.getPipeName(), message.getMessage()));
-                  } else if (pipeName.equals(message.getPipeName())) {
-                    hasException.set(true);
-                    exceptionMessageBuilder.append(
-                        String.format("Message: %s", message.getMessage()));
-                  }
-                });
+          .forEach(
+            message -> {
+              // Ignore the timeStamp for simplicity
+              if (pipeName == null) {
+                hasException.set(true);
+                exceptionMessageBuilder.append(
+                  String.format(
+                    "PipeName: %s, Message: %s",
+                    message.getPipeName(), message.getMessage()));
+              } else if (pipeName.equals(message.getPipeName())) {
+                hasException.set(true);
+                exceptionMessageBuilder.append(
+                  String.format("Message: %s", message.getMessage()));
+              }
+            });
 
         if (hasException.get()) {
           // Only print dataNodeId if the given pipe meets exception on that 
node
@@ -345,4 +345,4 @@ public abstract class AbstractOperatePipeProcedureV2
     super.deserialize(byteBuffer);
     isRollbackFromOperateOnDataNodesSuccessful = 
ReadWriteIOUtils.readBool(byteBuffer);
   }
-}
+}
\ No newline at end of file

Reply via email to