This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/ca-improve in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6adf50f1c8b851c70c83bcc3847c8334a35edda1 Author: JackieTien97 <[email protected]> AuthorDate: Fri Jul 28 12:07:44 2023 +0800 Add semaphere --- .../scheduler/FragmentInstanceDispatcherImpl.java | 46 +++++++++++++++------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 2e8929402d1..86abdaa8535 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -57,6 +57,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ; @@ -85,6 +86,8 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { private static final String UNEXPECTED_ERRORS = "Unexpected errors: "; + private static final Semaphore SEMAPHORE = new Semaphore(48); + public FragmentInstanceDispatcherImpl( QueryType type, MPPQueryContext queryContext, @@ -189,24 +192,37 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { new AsyncPlanNodeSender(asyncInternalServiceClientManager, remoteInstances); asyncPlanNodeSender.sendAll(); - if (!localInstances.isEmpty()) { - // sync dispatch to local - long localScheduleStartTime = System.nanoTime(); - for (FragmentInstance localInstance : localInstances) { - try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) { - dispatchLocally(localInstance); - } catch (FragmentInstanceDispatchException e) { - dataNodeFailureList.add(e.getFailureStatus()); - } catch (Throwable t) { - logger.warn(DISPATCH_FAILED, t); - dataNodeFailureList.add( - RpcUtils.getStatus( - TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage())); + try { + SEMAPHORE.acquire(); + if (!localInstances.isEmpty()) { + // sync dispatch to local + long localScheduleStartTime = System.nanoTime(); + for (FragmentInstance localInstance : localInstances) { + try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) { + dispatchLocally(localInstance); + } catch (FragmentInstanceDispatchException e) { + dataNodeFailureList.add(e.getFailureStatus()); + } catch (Throwable t) { + logger.warn(DISPATCH_FAILED, t); + dataNodeFailureList.add( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage())); + } } + PERFORMANCE_OVERVIEW_METRICS.recordScheduleLocalCost( + System.nanoTime() - localScheduleStartTime); } - PERFORMANCE_OVERVIEW_METRICS.recordScheduleLocalCost( - System.nanoTime() - localScheduleStartTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Interrupted when acquiring token"); + return immediateFuture( + new FragInstanceDispatchResult( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " + e.getMessage()))); + } finally { + SEMAPHORE.release(); } + // wait until remote dispatch done try { asyncPlanNodeSender.waitUntilCompleted();
