This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/ca-improve
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6adf50f1c8b851c70c83bcc3847c8334a35edda1
Author: JackieTien97 <[email protected]>
AuthorDate: Fri Jul 28 12:07:44 2023 +0800

    Add semaphere
---
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 46 +++++++++++++++-------
 1 file changed, 31 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 2e8929402d1..86abdaa8535 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -57,6 +57,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ;
@@ -85,6 +86,8 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
 
   private static final String UNEXPECTED_ERRORS = "Unexpected errors: ";
 
+  private static final Semaphore SEMAPHORE = new Semaphore(48);
+
   public FragmentInstanceDispatcherImpl(
       QueryType type,
       MPPQueryContext queryContext,
@@ -189,24 +192,37 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
         new AsyncPlanNodeSender(asyncInternalServiceClientManager, 
remoteInstances);
     asyncPlanNodeSender.sendAll();
 
-    if (!localInstances.isEmpty()) {
-      // sync dispatch to local
-      long localScheduleStartTime = System.nanoTime();
-      for (FragmentInstance localInstance : localInstances) {
-        try (SetThreadName threadName = new 
SetThreadName(localInstance.getId().getFullId())) {
-          dispatchLocally(localInstance);
-        } catch (FragmentInstanceDispatchException e) {
-          dataNodeFailureList.add(e.getFailureStatus());
-        } catch (Throwable t) {
-          logger.warn(DISPATCH_FAILED, t);
-          dataNodeFailureList.add(
-              RpcUtils.getStatus(
-                  TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + 
t.getMessage()));
+    try {
+      SEMAPHORE.acquire();
+      if (!localInstances.isEmpty()) {
+        // sync dispatch to local
+        long localScheduleStartTime = System.nanoTime();
+        for (FragmentInstance localInstance : localInstances) {
+          try (SetThreadName threadName = new 
SetThreadName(localInstance.getId().getFullId())) {
+            dispatchLocally(localInstance);
+          } catch (FragmentInstanceDispatchException e) {
+            dataNodeFailureList.add(e.getFailureStatus());
+          } catch (Throwable t) {
+            logger.warn(DISPATCH_FAILED, t);
+            dataNodeFailureList.add(
+                RpcUtils.getStatus(
+                    TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + 
t.getMessage()));
+          }
         }
+        PERFORMANCE_OVERVIEW_METRICS.recordScheduleLocalCost(
+            System.nanoTime() - localScheduleStartTime);
       }
-      PERFORMANCE_OVERVIEW_METRICS.recordScheduleLocalCost(
-          System.nanoTime() - localScheduleStartTime);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      logger.error("Interrupted when acquiring token");
+      return immediateFuture(
+          new FragInstanceDispatchResult(
+              RpcUtils.getStatus(
+                  TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " + 
e.getMessage())));
+    } finally {
+      SEMAPHORE.release();
     }
+
     // wait until remote dispatch done
     try {
       asyncPlanNodeSender.waitUntilCompleted();

Reply via email to