This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch compute-resource-balance-1.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 076aed3caac45187dbc8a1231d0e4b565680387a Author: 马子坤 <[email protected]> AuthorDate: Mon Jul 17 11:54:51 2023 +0800 [IOTDB-6064] Pipe: Fix deadlock in rolling back procedures concurrently (#10537) --- .../manager/pipe/task/PipeTaskCoordinator.java | 28 +++--- .../impl/pipe/AbstractOperatePipeProcedureV2.java | 106 ++++++++++----------- 2 files changed, 67 insertions(+), 67 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java index 886a728916b..e79509c0a9c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java @@ -116,7 +116,7 @@ public class PipeTaskCoordinator { final TSStatus status = configManager.getProcedureManager().stopPipe(pipeName); if (status == RpcUtils.SUCCESS_STATUS && isStoppedByRuntimeException) { LOGGER.info( - "Pipe {} has stopped successfully manually, stop its auto restart process.", pipeName); + "Pipe {} has stopped successfully manually, stop its auto restart process.", pipeName); pipeTaskInfo.setIsStoppedByRuntimeExceptionToFalse(pipeName); configManager.getProcedureManager().pipeHandleMetaChange(true, true); } @@ -131,20 +131,20 @@ public class PipeTaskCoordinator { LOGGER.warn("Failed to drop pipe {}. Result status: {}.", pipeName, status); } return isPipeExistedBeforeDrop - ? status - : RpcUtils.getStatus( - TSStatusCode.PIPE_NOT_EXIST_ERROR, - String.format( - "Failed to drop pipe %s. Failures: %s does not exist.", pipeName, pipeName)); + ? status + : RpcUtils.getStatus( + TSStatusCode.PIPE_NOT_EXIST_ERROR, + String.format( + "Failed to drop pipe %s. Failures: %s does not exist.", pipeName, pipeName)); } public TShowPipeResp showPipes(TShowPipeReq req) { lock(); try { return ((PipeTableResp) - configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset()) - .filter(req.whereClause, req.pipeName) - .convertToTShowPipeResp(); + configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset()) + .filter(req.whereClause, req.pipeName) + .convertToTShowPipeResp(); } finally { unlock(); } @@ -154,13 +154,13 @@ public class PipeTaskCoordinator { lock(); try { return ((PipeTableResp) - configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset()) - .convertToTGetAllPipeInfoResp(); + configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset()) + .convertToTGetAllPipeInfoResp(); } catch (IOException e) { LOGGER.warn("Failed to get all pipe info.", e); return new TGetAllPipeInfoResp( - new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()), - Collections.emptyList()); + new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()), + Collections.emptyList()); } finally { unlock(); } @@ -174,4 +174,4 @@ public class PipeTaskCoordinator { unlock(); } } -} +} \ No newline at end of file diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java index c0ef6d4bfc4..fe7a7a55b61 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java @@ -55,10 +55,10 @@ import java.util.concurrent.atomic.AtomicReference; * executed in sequence and node procedures can be locked when a pipe task procedure is running. */ public abstract class AbstractOperatePipeProcedureV2 - extends AbstractNodeProcedure<OperatePipeTaskState> { + extends AbstractNodeProcedure<OperatePipeTaskState> { private static final Logger LOGGER = - LoggerFactory.getLogger(AbstractOperatePipeProcedureV2.class); + LoggerFactory.getLogger(AbstractOperatePipeProcedureV2.class); private static final int RETRY_THRESHOLD = 1; @@ -75,11 +75,11 @@ public abstract class AbstractOperatePipeProcedureV2 try { if (configNodeProcedureEnv.getNodeLock().tryLock(this)) { pipeTaskInfo = - configNodeProcedureEnv - .getConfigManager() - .getPipeManager() - .getPipeTaskCoordinator() - .lock(); + configNodeProcedureEnv + .getConfigManager() + .getPipeManager() + .getPipeTaskCoordinator() + .lock(); LOGGER.info("ProcedureId {} acquire lock.", getProcId()); return ProcedureLockState.LOCK_ACQUIRED; } @@ -98,15 +98,15 @@ public abstract class AbstractOperatePipeProcedureV2 LOGGER.info("ProcedureId {} release lock.", getProcId()); if (pipeTaskInfo != null) { configNodeProcedureEnv - .getConfigManager() - .getPipeManager() - .getPipeTaskCoordinator() - .unlock(); + .getConfigManager() + .getPipeManager() + .getPipeTaskCoordinator() + .unlock(); } if (configNodeProcedureEnv.getNodeLock().releaseLock(this)) { configNodeProcedureEnv - .getNodeLock() - .wakeWaitingProcedures(configNodeProcedureEnv.getScheduler()); + .getNodeLock() + .wakeWaitingProcedures(configNodeProcedureEnv.getScheduler()); } } finally { configNodeProcedureEnv.getSchedulerLock().unlock(); @@ -131,7 +131,7 @@ public abstract class AbstractOperatePipeProcedureV2 * @throws PipeException if configNode consensus write failed */ protected abstract void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) - throws PipeException; + throws PipeException; /** * Execute at state {@link OperatePipeTaskState#OPERATE_ON_DATA_NODES}. @@ -140,11 +140,11 @@ public abstract class AbstractOperatePipeProcedureV2 * @throws IOException Exception when Serializing to byte buffer */ protected abstract void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) - throws PipeException, IOException; + throws PipeException, IOException; @Override protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeTaskState state) - throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { try { switch (state) { case VALIDATE_TASK: @@ -164,30 +164,30 @@ public abstract class AbstractOperatePipeProcedureV2 return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException( - String.format("Unknown state during executing operatePipeProcedure, %s", state)); + String.format("Unknown state during executing operatePipeProcedure, %s", state)); } } catch (Exception e) { // Retry before rollback if (getCycles() < RETRY_THRESHOLD) { LOGGER.warn( - "Encountered error when trying to {} at state [{}], retry [{}/{}]", - getOperation(), - state, - getCycles() + 1, - RETRY_THRESHOLD, - e); + "Encountered error when trying to {} at state [{}], retry [{}/{}]", + getOperation(), + state, + getCycles() + 1, + RETRY_THRESHOLD, + e); // Wait 3s for next retry TimeUnit.MILLISECONDS.sleep(3000L); } else { LOGGER.warn( - "All {} retries failed when trying to {} at state [{}], will rollback...", - RETRY_THRESHOLD, - getOperation(), - state, - e); + "All {} retries failed when trying to {} at state [{}], will rollback...", + RETRY_THRESHOLD, + getOperation(), + state, + e); setFailure( - new ProcedureException( - String.format("Fail to %s because %s", getOperation().name(), e.getMessage()))); + new ProcedureException( + String.format("Fail to %s because %s", getOperation().name(), e.getMessage()))); } } return Flow.HAS_MORE_STATE; @@ -200,7 +200,7 @@ public abstract class AbstractOperatePipeProcedureV2 @Override protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeTaskState state) - throws IOException, InterruptedException, ProcedureException { + throws IOException, InterruptedException, ProcedureException { switch (state) { case VALIDATE_TASK: rollbackFromValidateTask(env); @@ -237,7 +237,7 @@ public abstract class AbstractOperatePipeProcedureV2 protected abstract void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env); protected abstract void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) - throws IOException; + throws IOException; @Override protected OperatePipeTaskState getState(int stateId) { @@ -263,7 +263,7 @@ public abstract class AbstractOperatePipeProcedureV2 * @throws IOException Exception when Serializing to byte buffer */ protected Map<Integer, TPushPipeMetaResp> pushPipeMetaToDataNodes(ConfigNodeProcedureEnv env) - throws IOException { + throws IOException { final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>(); for (PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); @@ -280,7 +280,7 @@ public abstract class AbstractOperatePipeProcedureV2 * @return Error messages for the given pipe after pushing pipe meta */ protected String parsePushPipeMetaExceptionForPipe( - String pipeName, Map<Integer, TPushPipeMetaResp> respMap) { + String pipeName, Map<Integer, TPushPipeMetaResp> respMap) { final StringBuilder exceptionMessageBuilder = new StringBuilder(); for (Map.Entry<Integer, TPushPipeMetaResp> respEntry : respMap.entrySet()) { @@ -290,30 +290,30 @@ public abstract class AbstractOperatePipeProcedureV2 if (resp.getStatus().getCode() == TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()) { if (!resp.isSetExceptionMessages()) { exceptionMessageBuilder.append( - String.format( - "DataNodeId: %s, Message: Internal error while processing pushPipeMeta on dataNodes.", - dataNodeId)); + String.format( + "DataNodeId: %s, Message: Internal error while processing pushPipeMeta on dataNodes.", + dataNodeId)); continue; } AtomicBoolean hasException = new AtomicBoolean(false); resp.getExceptionMessages() - .forEach( - message -> { - // Ignore the timeStamp for simplicity - if (pipeName == null) { - hasException.set(true); - exceptionMessageBuilder.append( - String.format( - "PipeName: %s, Message: %s", - message.getPipeName(), message.getMessage())); - } else if (pipeName.equals(message.getPipeName())) { - hasException.set(true); - exceptionMessageBuilder.append( - String.format("Message: %s", message.getMessage())); - } - }); + .forEach( + message -> { + // Ignore the timeStamp for simplicity + if (pipeName == null) { + hasException.set(true); + exceptionMessageBuilder.append( + String.format( + "PipeName: %s, Message: %s", + message.getPipeName(), message.getMessage())); + } else if (pipeName.equals(message.getPipeName())) { + hasException.set(true); + exceptionMessageBuilder.append( + String.format("Message: %s", message.getMessage())); + } + }); if (hasException.get()) { // Only print dataNodeId if the given pipe meets exception on that node @@ -345,4 +345,4 @@ public abstract class AbstractOperatePipeProcedureV2 super.deserialize(byteBuffer); isRollbackFromOperateOnDataNodesSuccessful = ReadWriteIOUtils.readBool(byteBuffer); } -} +} \ No newline at end of file
