This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-async-handle-heartbeat
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 705b4aa766ff619575a01a5537b501c1a707decb
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sun Jun 18 22:31:43 2023 +0800

    Pipe: async execute pipeHandleLeaderChange and pipeHandleLeaderChange to 
avoid causing heartbeats to timeout
---
 .../iotdb/confignode/manager/ProcedureManager.java | 46 +++++++---------------
 .../pipe/runtime/PipeRuntimeCoordinator.java       | 45 +++++++--------------
 2 files changed, 29 insertions(+), 62 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index fc84b840245..1ba6a0df131 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -834,51 +834,35 @@ public class ProcedureManager {
     }
   }
 
-  public TSStatus pipeHandleLeaderChange(
+  public void pipeHandleLeaderChange(
       Map<TConsensusGroupId, Pair<Integer, Integer>> 
dataRegionGroupToOldAndNewLeaderPairMap) {
     try {
-      long procedureId =
+      final long procedureId =
           executor.submitProcedure(
               new 
PipeHandleLeaderChangeProcedure(dataRegionGroupToOldAndNewLeaderPairMap));
-      List<TSStatus> statusList = new ArrayList<>();
-      boolean isSucceed =
-          waitingProcedureFinished(Collections.singletonList(procedureId), 
statusList);
-      if (isSucceed) {
-        return RpcUtils.SUCCESS_STATUS;
-      } else {
-        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
-            .setMessage(statusList.get(0).getMessage());
-      }
+      LOGGER.info("PipeHandleLeaderChangeProcedure was submitted, procedureId: 
{}.", procedureId);
     } catch (Exception e) {
-      return new 
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+      LOGGER.warn("PipeHandleLeaderChangeProcedure was failed to submit.", e);
     }
   }
 
-  public TSStatus pipeMetaSync() {
+  public void pipeHandleMetaChange(
+      int dataNodeId, @NotNull List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode) {
     try {
-      long procedureId = executor.submitProcedure(new PipeMetaSyncProcedure());
-      List<TSStatus> statusList = new ArrayList<>();
-      boolean isSucceed =
-          waitingProcedureFinished(Collections.singletonList(procedureId), 
statusList);
-      if (isSucceed) {
-        return RpcUtils.SUCCESS_STATUS;
-      } else {
-        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
-            .setMessage(statusList.get(0).getMessage());
-      }
+      final long procedureId =
+          executor.submitProcedure(
+              new PipeHandleMetaChangeProcedure(dataNodeId, 
pipeMetaByteBufferListFromDataNode));
+      LOGGER.info("PipeHandleMetaChangeProcedure was submitted, procedureId: 
{}.", procedureId);
     } catch (Exception e) {
-      return new 
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+      LOGGER.warn("PipeHandleMetaChangeProcedure was failed to submit.", e);
     }
   }
 
-  public TSStatus pipeHandleMetaChange(
-      int dataNodeId, @NotNull List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode) {
+  public TSStatus pipeMetaSync() {
     try {
-      long procedureId =
-          executor.submitProcedure(
-              new PipeHandleMetaChangeProcedure(dataNodeId, 
pipeMetaByteBufferListFromDataNode));
-      List<TSStatus> statusList = new ArrayList<>();
-      boolean isSucceed =
+      final long procedureId = executor.submitProcedure(new 
PipeMetaSyncProcedure());
+      final List<TSStatus> statusList = new ArrayList<>();
+      final boolean isSucceed =
           waitingProcedureFinished(Collections.singletonList(procedureId), 
statusList);
       if (isSucceed) {
         return RpcUtils.SUCCESS_STATUS;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 00182aac69d..f73eb150d0e 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -21,18 +21,14 @@ package org.apache.iotdb.confignode.manager.pipe.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
 import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent;
 import org.apache.iotdb.metrics.utils.IoTDBMetricsUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.jetbrains.annotations.NotNull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
@@ -41,8 +37,6 @@ import java.util.Map;
 
 public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeRuntimeCoordinator.class);
-
   private final ConfigManager configManager;
 
   private final PipeMetaSyncer pipeMetaSyncer;
@@ -90,23 +84,9 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
       return;
     }
 
-    final TSStatus result =
-        configManager
-            .getProcedureManager()
-            .pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap);
-    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      LOGGER.warn(
-          "PipeRuntimeCoordinator meets error in handling data region leader 
change, status: ({})",
-          result);
-    }
-  }
-
-  public void startPipeMetaSync() {
-    pipeMetaSyncer.start();
-  }
-
-  public void stopPipeMetaSync() {
-    pipeMetaSyncer.stop();
+    configManager
+        .getProcedureManager()
+        .pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap);
   }
 
   /**
@@ -117,13 +97,16 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
    */
   public void parseHeartbeat(
       int dataNodeId, @NotNull List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode) {
-    final TSStatus result =
-        configManager
-            .getProcedureManager()
-            .pipeHandleMetaChange(dataNodeId, 
pipeMetaByteBufferListFromDataNode);
-    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      LOGGER.warn(
-          "PipeTaskCoordinator meets error in handling pipe meta change, 
status: ({})", result);
-    }
+    configManager
+        .getProcedureManager()
+        .pipeHandleMetaChange(dataNodeId, pipeMetaByteBufferListFromDataNode);
+  }
+
+  public void startPipeMetaSync() {
+    pipeMetaSyncer.start();
+  }
+
+  public void stopPipeMetaSync() {
+    pipeMetaSyncer.stop();
   }
 }

Reply via email to