This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ebac6b7e11 Modify strategy of wait task time limitation & Fix proc id 
(#12552)
0ebac6b7e11 is described below

commit 0ebac6b7e11bf2d7b9994f7f39c20a4cb0511d48
Author: Li Yu Heng <[email protected]>
AuthorDate: Thu May 23 12:10:47 2024 +0800

    Modify strategy of wait task time limitation & Fix proc id (#12552)
---
 .../procedure/env/RegionMaintainHandler.java       | 40 +++++++++++++---------
 .../impl/region/AddRegionPeerProcedure.java        |  4 +--
 2 files changed, 26 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index 4dac3f311cb..baa4fefad66 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -33,6 +33,7 @@ import 
org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
@@ -381,13 +382,11 @@ public class RegionMaintainHandler {
 
   // TODO: will use 'procedure yield' to refactor later
   public TRegionMigrateResult waitTaskFinish(long taskId, TDataNodeLocation 
dataNodeLocation) {
-    // In some cases the DataNode is still working, but its status is unknown.
-    // In order to make task continue under this circumstance, some 
unconditional retries are
-    // performed here.
-    int unconditionallyRetry = 0;
-    while (unconditionallyRetry < 6
-        || 
configManager.getLoadManager().getNodeStatus(dataNodeLocation.getDataNodeId())
-            != NodeStatus.Unknown) {
+    final long MAX_DISCONNECTION_TOLERATE_MS = 600_000;
+    final long INITIAL_DISCONNECTION_TOLERATE_MS = 60_000;
+    long startTime = System.nanoTime();
+    long lastReportTime = System.nanoTime();
+    while (true) {
       try (SyncDataNodeInternalServiceClient dataNodeClient =
           
dataNodeClientManager.borrowClient(dataNodeLocation.getInternalEndPoint())) {
         TRegionMigrateResult report = 
dataNodeClient.getRegionMaintainResult(taskId);
@@ -396,20 +395,29 @@ public class RegionMaintainHandler {
         }
       } catch (Exception ignore) {
 
-      } finally {
-        try {
-          TimeUnit.SECONDS.sleep(1);
-        } catch (InterruptedException ignore) {
-          Thread.currentThread().interrupt();
-        }
       }
-      unconditionallyRetry++;
+      long waitTime =
+          Math.min(
+              INITIAL_DISCONNECTION_TOLERATE_MS
+                  + TimeUnit.NANOSECONDS.toMillis(lastReportTime - startTime) 
/ 60,
+              MAX_DISCONNECTION_TOLERATE_MS);
+      long disconnectionTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() 
- lastReportTime);
+      if (disconnectionTime > waitTime) {
+        break;
+      }
+      try {
+        TimeUnit.SECONDS.sleep(1);
+      } catch (InterruptedException ignore) {
+        Thread.currentThread().interrupt();
+      }
     }
     LOGGER.warn(
-        "{} task {} cannot contact to DataNode {}",
+        "{} task {} cannot get task report from DataNode {}, last report time 
is {} ago",
         REGION_MIGRATE_PROCESS,
         taskId,
-        dataNodeLocation);
+        dataNodeLocation,
+        CommonDateTimeUtils.convertMillisecondToDurationStr(
+            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
lastReportTime)));
     TRegionMigrateResult report = new TRegionMigrateResult();
     report.setTaskStatus(TRegionMaintainTaskStatus.FAIL);
     report.setFailedNodeAndReason(new HashMap<>());
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
index f8fab54948d..638009104f6 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
@@ -165,9 +165,9 @@ public class AddRegionPeerProcedure
       ConfigNodeProcedureEnv env, RegionMaintainHandler handler, String 
reason, Exception e)
       throws ProcedureException {
     if (e != null) {
-      LOGGER.warn("[pid{}][AddRegion] Start to roll back, because: {}", 
getRootProcId(), reason, e);
+      LOGGER.warn("[pid{}][AddRegion] Start to roll back, because: {}", 
getProcId(), reason, e);
     } else {
-      LOGGER.warn("[pid{}][AddRegion] Start to roll back, because: {}", 
getRootProcId(), reason);
+      LOGGER.warn("[pid{}][AddRegion] Start to roll back, because: {}", 
getProcId(), reason);
     }
     handler.removeRegionLocation(consensusGroupId, destDataNode);
 

Reply via email to