This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch HighAvailability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 396569ea003c503d2d4ea82e2382ed2f019d8f29
Author: JackieTien97 <[email protected]>
AuthorDate: Mon Oct 23 16:33:09 2023 +0800

    [IOTDB-6200] Change schema template to device template
---
 .../consensus/iot/client/DispatchLogHandler.java   |  2 +-
 .../metadata/SeriesOverflowException.java          |  5 +-
 .../plan/planner/plan/FragmentInstance.java        | 11 +++
 .../plan/scheduler/AsyncPlanNodeSender.java        | 87 +++++++++++++++-------
 .../plan/scheduler/AsyncSendPlanNodeHandler.java   | 27 ++++++-
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 26 +++++++
 .../apache/iotdb/commons/client/ThriftClient.java  |  2 +-
 7 files changed, 127 insertions(+), 33 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index b851b29981b..193adc6668b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -69,7 +69,7 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
     logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() - 
createTime);
   }
 
-  private boolean needRetry(int statusCode) {
+  public static boolean needRetry(int statusCode) {
     return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()
         || statusCode == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
         || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
index f8def6004cf..72dd6b75cce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
@@ -28,10 +28,7 @@ public class SeriesOverflowException extends 
MetadataException {
   public SeriesOverflowException(long memoryUsage, long seriesNum) {
     super(
         String.format(
-            "There are too many timeseries in memory. "
-                + "Current memory usage is %s and series num is %s. "
-                + "Please increase MAX_HEAP_SIZE in datanode-env.sh/bat, "
-                + "restart and create timeseries again.",
+            "Too many timeseries in memory without device template(current 
memory: %s, series num: %s). To optimize memory, DEVICE TEMPLATE is more 
recommended when devices have same time series.",
             memoryUsage, seriesNum),
         TSStatusCode.SERIES_OVERFLOW.getStatusCode());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
index 9a211c8384a..fcef7b54a5e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
@@ -74,6 +74,9 @@ public class FragmentInstance implements IConsensusRequest {
 
   private boolean isHighestPriority;
 
+  // indicate which index we are retrying
+  private transient int nextRetryIndex = 0;
+
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
 
@@ -271,6 +274,14 @@ public class FragmentInstance implements IConsensusRequest 
{
     return hostDataNode;
   }
 
+  public TDataNodeLocation getNextRetriedHostDataNode() {
+    nextRetryIndex =
+        (nextRetryIndex + 1) % 
executorType.getRegionReplicaSet().getDataNodeLocations().size();
+    this.hostDataNode =
+        
executorType.getRegionReplicaSet().getDataNodeLocations().get(nextRetryIndex);
+    return hostDataNode;
+  }
+
   public long getTimeOut() {
     return timeOut;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
index 525397123ba..c97c10e99de 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
@@ -35,15 +35,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static com.google.common.util.concurrent.Futures.immediateFuture;
-
 public class AsyncPlanNodeSender {
 
   private static final Logger logger = 
LoggerFactory.getLogger(AsyncPlanNodeSender.class);
@@ -53,8 +51,11 @@ public class AsyncPlanNodeSender {
 
   private final Map<TEndPoint, BatchRequestWithIndex> batchRequests;
   private final Map<Integer, TSendSinglePlanNodeResp> instanceId2RespMap;
+
+  private final List<Integer> needRetryInstanceIndex;
+
   private final AtomicLong pendingNumber;
-  private final long startSendTime;
+  private long startSendTime;
 
   public AsyncPlanNodeSender(
       IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
@@ -77,6 +78,7 @@ public class AsyncPlanNodeSender {
                   instances.get(i).getRegionReplicaSet().getRegionId()));
     }
     this.instanceId2RespMap = new ConcurrentHashMap<>(instances.size() + 1, 1);
+    this.needRetryInstanceIndex = Collections.synchronizedList(new 
ArrayList<>());
     this.pendingNumber = new AtomicLong(batchRequests.keySet().size());
   }
 
@@ -84,7 +86,11 @@ public class AsyncPlanNodeSender {
     for (Map.Entry<TEndPoint, BatchRequestWithIndex> entry : 
batchRequests.entrySet()) {
       AsyncSendPlanNodeHandler handler =
           new AsyncSendPlanNodeHandler(
-              entry.getValue().getIndexes(), pendingNumber, 
instanceId2RespMap, startSendTime);
+              entry.getValue().getIndexes(),
+              pendingNumber,
+              instanceId2RespMap,
+              needRetryInstanceIndex,
+              startSendTime);
       try {
         AsyncDataNodeInternalServiceClient client =
             asyncInternalServiceClientManager.borrowClient(entry.getKey());
@@ -135,32 +141,61 @@ public class AsyncPlanNodeSender {
     return failureStatusList;
   }
 
-  public Future<FragInstanceDispatchResult> getResult() {
-    for (Map.Entry<Integer, TSendSinglePlanNodeResp> entry : 
instanceId2RespMap.entrySet()) {
-      if (!entry.getValue().accepted) {
-        logger.warn(
-            "dispatch write failed. status: {}, code: {}, message: {}, node 
{}",
-            entry.getValue().status,
-            TSStatusCode.representOf(entry.getValue().status.code),
-            entry.getValue().message,
-            
instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
-        if (entry.getValue().getStatus() == null) {
-          return immediateFuture(
-              new FragInstanceDispatchResult(
-                  RpcUtils.getStatus(
-                      TSStatusCode.WRITE_PROCESS_ERROR, 
entry.getValue().getMessage())));
-        } else {
-          return immediateFuture(new 
FragInstanceDispatchResult(entry.getValue().getStatus()));
-        }
-      }
+  public boolean needRetry() {
+    // retried FI list is not empty and data region replica number is greater 
than 1
+    return !needRetryInstanceIndex.isEmpty()
+        && instances.get(0).getRegionReplicaSet().dataNodeLocations.size() > 1;
+  }
+
+  /**
+   * This function should be called after all last batch responses were 
received. This function will
+   * do the cleaning work and caller won't need to do the cleaning work 
outside this function. We
+   * will retry all failed FIs in last batch whose id were all saved in 
needRetryInstanceIds, if
+   * there are still failed FIs this time, they will be also saved in 
needRetryInstanceIds.
+   *
+   * <p>It's a sync function which means that once this function returned, the 
results of this retry
+   * have been received, and you don't need to call waitUntilCompleted.
+   */
+  public void retry() throws InterruptedException {
+    // 1. rebuild the batchRequests using remaining failed FIs, change the 
replica for each failed
+    // FI in this step
+    batchRequests.clear();
+    for (int fragmentInstanceIndex : needRetryInstanceIndex) {
+      this.batchRequests
+          .computeIfAbsent(
+              instances
+                  .get(fragmentInstanceIndex)
+                  .getNextRetriedHostDataNode()
+                  .getInternalEndPoint(),
+              x -> new BatchRequestWithIndex())
+          .addSinglePlanNodeReq(
+              fragmentInstanceIndex,
+              new TSendSinglePlanNodeReq(
+                  new TPlanNode(
+                      instances
+                          .get(fragmentInstanceIndex)
+                          .getFragment()
+                          .getPlanNodeTree()
+                          .serializeToByteBuffer()),
+                  
instances.get(fragmentInstanceIndex).getRegionReplicaSet().getRegionId()));
     }
-    return immediateFuture(new FragInstanceDispatchResult(true));
+
+    // 2. reset the pendingNumber, needRetryInstanceIds and startSendTime
+    needRetryInstanceIndex.clear();
+    pendingNumber.set(batchRequests.keySet().size());
+    startSendTime = System.nanoTime();
+
+    // 3. call sendAll() to retry
+    sendAll();
+
+    // 4. call waitUntilCompleted() to wait for the responses
+    waitUntilCompleted();
   }
 
   /**
    * This class is used to aggregate PlanNode of the same datanode into one 
rpc. In order to ensure
-   * the one-to-one correspondence between response and request, the 
corresponding index needs to be
-   * recorded.
+   * the one-to-one correspondence between r esponse and request, the 
corresponding index needs to
+   * be recorded.
    */
   static class BatchRequestWithIndex {
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
index 63af1ffedb9..dea6a162655 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
@@ -20,22 +20,27 @@
 package org.apache.iotdb.db.queryengine.plan.scheduler;
 
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.consensus.iot.client.DispatchLogHandler;
 import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeResp;
 import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.thrift.async.AsyncMethodCallback;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.iotdb.commons.client.ThriftClient.isConnectionBroken;
+
 public class AsyncSendPlanNodeHandler implements 
AsyncMethodCallback<TSendBatchPlanNodeResp> {
 
   private final List<Integer> instanceIds;
   private final AtomicLong pendingNumber;
   private final Map<Integer, TSendSinglePlanNodeResp> instanceId2RespMap;
+  private final List<Integer> needRetryInstanceIndex;
   private final long sendTime;
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS 
=
       PerformanceOverviewMetrics.getInstance();
@@ -44,17 +49,23 @@ public class AsyncSendPlanNodeHandler implements 
AsyncMethodCallback<TSendBatchP
       List<Integer> instanceIds,
       AtomicLong pendingNumber,
       Map<Integer, TSendSinglePlanNodeResp> instanceId2RespMap,
+      List<Integer> needRetryInstanceIndex,
       long sendTime) {
     this.instanceIds = instanceIds;
     this.pendingNumber = pendingNumber;
     this.instanceId2RespMap = instanceId2RespMap;
+    this.needRetryInstanceIndex = needRetryInstanceIndex;
     this.sendTime = sendTime;
   }
 
   @Override
   public void onComplete(TSendBatchPlanNodeResp sendBatchPlanNodeResp) {
     for (int i = 0; i < sendBatchPlanNodeResp.getResponses().size(); i++) {
-      instanceId2RespMap.put(instanceIds.get(i), 
sendBatchPlanNodeResp.getResponses().get(i));
+      TSendSinglePlanNodeResp singlePlanNodeResp = 
sendBatchPlanNodeResp.getResponses().get(i);
+      instanceId2RespMap.put(instanceIds.get(i), singlePlanNodeResp);
+      if (needRetry(singlePlanNodeResp)) {
+        needRetryInstanceIndex.add(instanceIds.get(i));
+      }
     }
     if (pendingNumber.decrementAndGet() == 0) {
       PERFORMANCE_OVERVIEW_METRICS.recordScheduleRemoteCost(System.nanoTime() 
- sendTime);
@@ -66,6 +77,9 @@ public class AsyncSendPlanNodeHandler implements 
AsyncMethodCallback<TSendBatchP
 
   @Override
   public void onError(Exception e) {
+    if (needRetry(e)) {
+      needRetryInstanceIndex.addAll(instanceIds);
+    }
     TSendSinglePlanNodeResp resp = new TSendSinglePlanNodeResp();
     String errorMsg = String.format("Fail to send plan node, exception 
message: %s", e);
     resp.setAccepted(false);
@@ -80,4 +94,15 @@ public class AsyncSendPlanNodeHandler implements 
AsyncMethodCallback<TSendBatchP
       }
     }
   }
+
+  private boolean needRetry(Exception e) {
+    Throwable rootCause = ExceptionUtils.getRootCause(e);
+    // if the exception is SocketException and its error message is Broken 
pipe, it means that the
+    // remote node may go offline
+    return isConnectionBroken(rootCause);
+  }
+
+  private boolean needRetry(TSendSinglePlanNodeResp resp) {
+    return DispatchLogHandler.needRetry(resp.status.code);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 2e8929402d1..0e698c7b81c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -57,6 +57,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ;
@@ -210,6 +211,25 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     // wait until remote dispatch done
     try {
       asyncPlanNodeSender.waitUntilCompleted();
+
+      if (asyncPlanNodeSender.needRetry()) {
+        // retry failed remote FIs
+        int retry = 0;
+        final int maxRetryTimes = 10;
+        long waitMillis = getRetrySleepTime(retry);
+
+        while (asyncPlanNodeSender.needRetry()) {
+          retry++;
+          asyncPlanNodeSender.retry();
+          if (!(asyncPlanNodeSender.needRetry() && retry < maxRetryTimes)) {
+            break;
+          }
+          // still need to retry, sleep some time before make another retry.
+          Thread.sleep(waitMillis);
+          waitMillis = getRetrySleepTime(retry);
+        }
+      }
+
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       logger.error("Interrupted when dispatching write async", e);
@@ -239,6 +259,12 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     }
   }
 
+  private long getRetrySleepTime(int retryTimes) {
+    return Math.min(
+        (long) (TimeUnit.MILLISECONDS.toMillis(100) * Math.pow(2, retryTimes)),
+        TimeUnit.SECONDS.toMillis(20));
+  }
+
   private void dispatchOneInstance(FragmentInstance instance)
       throws FragmentInstanceDispatchException {
     TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
index e9d3b9c36ce..f57015cfab3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
@@ -102,7 +102,7 @@ public interface ThriftClient {
    * @param cause Throwable
    * @return true/false
    */
-  static boolean isConnectionBroken(Throwable cause) {
+  public static boolean isConnectionBroken(Throwable cause) {
     return (cause instanceof SocketException && 
cause.getMessage().contains("Broken pipe"))
         || (cause instanceof TTransportException
             && cause.getMessage().contains("Socket is closed by peer"));

Reply via email to