This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch 1.2-pipe-async-handle-heartbeat in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b804f5e4a2360e87cc794b785a701bc27ec7e732 Author: Steve Yurong Su <[email protected]> AuthorDate: Sun Jun 18 22:31:43 2023 +0800 Pipe: async execute pipeHandleLeaderChange and pipeHandleLeaderChange to avoid causing heartbeats to timeout --- .../iotdb/confignode/manager/ProcedureManager.java | 46 +++++++--------------- .../pipe/runtime/PipeRuntimeCoordinator.java | 45 +++++++-------------- 2 files changed, 29 insertions(+), 62 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index fc84b840245..1ba6a0df131 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -834,51 +834,35 @@ public class ProcedureManager { } } - public TSStatus pipeHandleLeaderChange( + public void pipeHandleLeaderChange( Map<TConsensusGroupId, Pair<Integer, Integer>> dataRegionGroupToOldAndNewLeaderPairMap) { try { - long procedureId = + final long procedureId = executor.submitProcedure( new PipeHandleLeaderChangeProcedure(dataRegionGroupToOldAndNewLeaderPairMap)); - List<TSStatus> statusList = new ArrayList<>(); - boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return RpcUtils.SUCCESS_STATUS; - } else { - return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()) - .setMessage(statusList.get(0).getMessage()); - } + LOGGER.info("PipeHandleLeaderChangeProcedure was submitted, procedureId: {}.", procedureId); } catch (Exception e) { - return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()); + LOGGER.warn("PipeHandleLeaderChangeProcedure was failed to submit.", e); } } - public TSStatus pipeMetaSync() { + public void pipeHandleMetaChange( + int dataNodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromDataNode) { try { - long procedureId = executor.submitProcedure(new PipeMetaSyncProcedure()); - List<TSStatus> statusList = new ArrayList<>(); - boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return RpcUtils.SUCCESS_STATUS; - } else { - return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()) - .setMessage(statusList.get(0).getMessage()); - } + final long procedureId = + executor.submitProcedure( + new PipeHandleMetaChangeProcedure(dataNodeId, pipeMetaByteBufferListFromDataNode)); + LOGGER.info("PipeHandleMetaChangeProcedure was submitted, procedureId: {}.", procedureId); } catch (Exception e) { - return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()); + LOGGER.warn("PipeHandleMetaChangeProcedure was failed to submit.", e); } } - public TSStatus pipeHandleMetaChange( - int dataNodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromDataNode) { + public TSStatus pipeMetaSync() { try { - long procedureId = - executor.submitProcedure( - new PipeHandleMetaChangeProcedure(dataNodeId, pipeMetaByteBufferListFromDataNode)); - List<TSStatus> statusList = new ArrayList<>(); - boolean isSucceed = + final long procedureId = executor.submitProcedure(new PipeMetaSyncProcedure()); + final List<TSStatus> statusList = new ArrayList<>(); + final boolean isSucceed = waitingProcedureFinished(Collections.singletonList(procedureId), statusList); if (isSucceed) { return RpcUtils.SUCCESS_STATUS; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java index 00182aac69d..f73eb150d0e 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java @@ -21,18 +21,14 @@ package org.apache.iotdb.confignode.manager.pipe.runtime; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber; import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent; import org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent; import org.apache.iotdb.metrics.utils.IoTDBMetricsUtils; -import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.Pair; import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.HashMap; @@ -41,8 +37,6 @@ import java.util.Map; public class PipeRuntimeCoordinator implements IClusterStatusSubscriber { - private static final Logger LOGGER = LoggerFactory.getLogger(PipeRuntimeCoordinator.class); - private final ConfigManager configManager; private final PipeMetaSyncer pipeMetaSyncer; @@ -90,23 +84,9 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber { return; } - final TSStatus result = - configManager - .getProcedureManager() - .pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap); - if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.warn( - "PipeRuntimeCoordinator meets error in handling data region leader change, status: ({})", - result); - } - } - - public void startPipeMetaSync() { - pipeMetaSyncer.start(); - } - - public void stopPipeMetaSync() { - pipeMetaSyncer.stop(); + configManager + .getProcedureManager() + .pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap); } /** @@ -117,13 +97,16 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber { */ public void parseHeartbeat( int dataNodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromDataNode) { - final TSStatus result = - configManager - .getProcedureManager() - .pipeHandleMetaChange(dataNodeId, pipeMetaByteBufferListFromDataNode); - if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.warn( - "PipeTaskCoordinator meets error in handling pipe meta change, status: ({})", result); - } + configManager + .getProcedureManager() + .pipeHandleMetaChange(dataNodeId, pipeMetaByteBufferListFromDataNode); + } + + public void startPipeMetaSync() { + pipeMetaSyncer.start(); + } + + public void stopPipeMetaSync() { + pipeMetaSyncer.stop(); } }
