This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/write_opt_version1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 22ee02fe43b9deeb0c668e03a2e22024be4eed66 Author: Jinrui.Zhang <[email protected]> AuthorDate: Mon Jun 20 22:58:35 2022 +0800 fix the issue in queryTerminator --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +- .../iotdb/db/mpp/execution/StateMachine.java | 7 ++- .../org/apache/iotdb/db/mpp/plan/Coordinator.java | 2 +- .../org/apache/iotdb/db/mpp/plan/StepTracker.java | 15 +++-- .../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 11 ++++ .../db/mpp/plan/execution/QueryExecution.java | 5 ++ .../db/mpp/plan/scheduler/ClusterScheduler.java | 30 ++++++---- .../mpp/plan/scheduler/SimpleQueryTerminator.java | 65 +++++++++++++--------- .../db/mpp/plan/scheduler/StandaloneScheduler.java | 6 +- 9 files changed, 98 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 5de782e678..1ca1c2c794 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -928,10 +928,10 @@ public class IoTDBConfig { private int triggerForwardMQTTPoolSize = 4; /** ThreadPool size for read operation in coordinator */ - private int coordinatorReadExecutorSize = 100; + private int coordinatorReadExecutorSize = 50; /** ThreadPool size for write operation in coordinator */ - private int coordinatorWriteExecutorSize = 100; + private int coordinatorWriteExecutorSize = 50; IoTDBConfig() {} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java index c1b7e76360..e53c2cb02d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java @@ -13,6 +13,8 @@ */ package org.apache.iotdb.db.mpp.execution; +import org.apache.iotdb.db.mpp.plan.StepTracker; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -270,6 +272,7 @@ public class StateMachine<T> { * execution. The listener is notified immediately of the current state. */ public void addStateChangeListener(StateChangeListener<T> stateChangeListener) { + long startTime = System.nanoTime(); requireNonNull(stateChangeListener, "stateChangeListener is null"); boolean inTerminalState; @@ -281,7 +284,7 @@ public class StateMachine<T> { stateChangeListeners.add(stateChangeListener); } } - + StepTracker.trace("addStateListener", startTime, System.nanoTime()); // fire state change listener with the current state // always fire listener callbacks from a different thread safeExecute(() -> stateChangeListener.stateChanged(currentState)); @@ -310,6 +313,8 @@ public class StateMachine<T> { private void safeExecute(Runnable command) { try { + // new Thread(command).start(); + // command.run(); executor.execute(command); } catch (RejectedExecutionException e) { if ((executor instanceof ExecutorService) && ((ExecutorService) executor).isShutdown()) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java index 495af1f3e0..f9a66b7ec1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java @@ -57,7 +57,7 @@ public class Coordinator { private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator"; private static final String COORDINATOR_WRITE_EXECUTOR_NAME = "MPPCoordinatorWrite"; private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled"; - private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 1; + private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10; private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> INTERNAL_SERVICE_CLIENT_MANAGER = diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java index 1fe7df7be0..9d3da35f72 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java @@ -27,16 +27,18 @@ import java.util.Map; class Metric { private static final Logger logger = LoggerFactory.getLogger(Metric.class); + private static final int PRINT_RATE = 100; + public String stepName; public long invokeCount; public long totalTime; - public long last100Time; + public long lastCycleTime; public Metric(String stepName) { this.stepName = stepName; this.invokeCount = 0; this.totalTime = 0; - this.last100Time = 0; + this.lastCycleTime = 0; } public void trace(long startTime, long endTime) { @@ -45,17 +47,18 @@ class Metric { } public void tryPrint() { - if (invokeCount % 100 == 0) { + if (invokeCount % PRINT_RATE == 0) { logger.info( String.format( - "step metrics [%d]-[%s] - Total: %d, SUM: %.2fms, AVG: %fms, Last100AVG: %fms", + "step metrics [%d]-[%s] - Total: %d, SUM: %.2fms, AVG: %fms, Last%dAVG: %fms", Thread.currentThread().getId(), stepName, invokeCount, totalTime * 1.0 / 1000000, totalTime * 1.0 / 1000000 / invokeCount, - (totalTime * 1.0 - last100Time) / 1000000 / 100)); - last100Time = totalTime; + PRINT_RATE, + (totalTime * 1.0 - lastCycleTime) / 1000000 / PRINT_RATE)); + lastCycleTime = totalTime; } } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java index 84db6df7f7..9f72dc4802 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java @@ -96,10 +96,21 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { schemaFetchStatement.setSchemaPartition(schemaPartition); SchemaTree result = executeSchemaFetchQuery(schemaFetchStatement); + // SchemaTree result = mockFetch(patternTree); result.setStorageGroups(storageGroups); return result; } + public SchemaTree mockFetch(PathPatternTree patternTree) { + SchemaTree tree = new SchemaTree(); + List<PartialPath> paths = patternTree.getAllPathPatterns(); + for (PartialPath path : paths) { + tree.appendSingleMeasurement( + path, new MeasurementSchema(path.getMeasurement(), TSDataType.FLOAT), null, false); + } + return tree; + } + private SchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) { long queryId = SessionManager.getInstance().requestQueryId(false); try { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index a7a39564ef..d2d8b1c3dd 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -349,9 +349,14 @@ public class QueryExecution implements IQueryExecution { // Although we monitor the state to transition to RUNNING, the future will return if any // Terminated state is triggered try { + if (stateMachine.getState() == QueryState.FINISHED) { + return getExecutionResult(QueryState.FINISHED); + } SettableFuture<QueryState> future = SettableFuture.create(); + final long addStart = System.nanoTime(); stateMachine.addStateChangeListener( state -> { + StepTracker.trace("stateQueue", addStart, System.nanoTime()); if (state == QueryState.RUNNING || state.isDone()) { future.set(state); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java index 8390ac6fe5..989b025660 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java @@ -57,8 +57,8 @@ public class ClusterScheduler implements IScheduler { private final List<FragmentInstance> instances; private final IFragInstanceDispatcher dispatcher; - private final IFragInstanceStateTracker stateTracker; - private final IQueryTerminator queryTerminator; + private IFragInstanceStateTracker stateTracker; + private IQueryTerminator queryTerminator; public ClusterScheduler( MPPQueryContext queryContext, @@ -75,12 +75,18 @@ public class ClusterScheduler implements IScheduler { this.dispatcher = new FragmentInstanceDispatcherImpl( queryType, executor, writeOperationExecutor, internalServiceClientManager); - this.stateTracker = - new FixedRateFragInsStateTracker( - stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager); - this.queryTerminator = - new SimpleQueryTerminator( - executor, queryContext.getQueryId(), instances, internalServiceClientManager); + if (queryType == QueryType.READ) { + this.stateTracker = + new FixedRateFragInsStateTracker( + stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager); + this.queryTerminator = + new SimpleQueryTerminator( + executor, + scheduledExecutor, + queryContext.getQueryId(), + instances, + internalServiceClientManager); + } } @Override @@ -132,9 +138,13 @@ public class ClusterScheduler implements IScheduler { // TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best // practice ? dispatcher.abort(); - stateTracker.abort(); + if (stateTracker != null) { + stateTracker.abort(); + } // TODO: (xingtanzjr) handle the exception when the termination cannot succeed - queryTerminator.terminate(); + if (queryTerminator != null) { + queryTerminator.terminate(); + } } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java index 357c26fcf5..55e6047555 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java @@ -32,67 +32,80 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class SimpleQueryTerminator implements IQueryTerminator { private static final Logger logger = LoggerFactory.getLogger(SimpleQueryTerminator.class); private static final long TERMINATION_GRACE_PERIOD_IN_MS = 1000L; private final ExecutorService executor; + protected ScheduledExecutorService scheduledExecutor; private final QueryId queryId; - private final List<FragmentInstance> fragmentInstances; + private List<TEndPoint> relatedHost; + private Map<TEndPoint, List<TFragmentInstanceId>> ownedFragmentInstance; private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager; public SimpleQueryTerminator( ExecutorService executor, + ScheduledExecutorService scheduledExecutor, QueryId queryId, List<FragmentInstance> fragmentInstances, IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) { this.executor = executor; + this.scheduledExecutor = scheduledExecutor; this.queryId = queryId; - this.fragmentInstances = fragmentInstances; this.internalServiceClientManager = internalServiceClientManager; + calculateParameter(fragmentInstances); + } + + private void calculateParameter(List<FragmentInstance> instances) { + this.relatedHost = getRelatedHost(instances); + this.ownedFragmentInstance = new HashMap<>(); + for (TEndPoint endPoint : relatedHost) { + ownedFragmentInstance.put(endPoint, getRelatedFragmentInstances(endPoint, instances)); + } } @Override public Future<Boolean> terminate() { - List<TEndPoint> relatedHost = getRelatedHost(); - try { - Thread.sleep(TERMINATION_GRACE_PERIOD_IN_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + return scheduledExecutor.schedule( + this::syncTerminate, TERMINATION_GRACE_PERIOD_IN_MS, TimeUnit.MILLISECONDS); + } + + public Boolean syncTerminate() { + for (TEndPoint endPoint : relatedHost) { + // TODO (jackie tien) change the port + try (SyncDataNodeInternalServiceClient client = + internalServiceClientManager.borrowClient(endPoint)) { + client.cancelQuery( + new TCancelQueryReq(queryId.getId(), ownedFragmentInstance.get(endPoint))); + } catch (IOException e) { + logger.error("can't connect to node {}", endPoint, e); + return false; + } catch (TException e) { + return false; + } } - return executor.submit( - () -> { - for (TEndPoint endPoint : relatedHost) { - // TODO (jackie tien) change the port - try (SyncDataNodeInternalServiceClient client = - internalServiceClientManager.borrowClient(endPoint)) { - client.cancelQuery( - new TCancelQueryReq(queryId.getId(), getRelatedFragmentInstances(endPoint))); - } catch (IOException e) { - logger.error("can't connect to node {}", endPoint, e); - return false; - } catch (TException e) { - return false; - } - } - return true; - }); + return true; } - private List<TEndPoint> getRelatedHost() { + private List<TEndPoint> getRelatedHost(List<FragmentInstance> fragmentInstances) { return fragmentInstances.stream() .map(instance -> instance.getHostDataNode().internalEndPoint) .distinct() .collect(Collectors.toList()); } - private List<TFragmentInstanceId> getRelatedFragmentInstances(TEndPoint endPoint) { + private List<TFragmentInstanceId> getRelatedFragmentInstances( + TEndPoint endPoint, List<FragmentInstance> fragmentInstances) { return fragmentInstances.stream() .filter(instance -> instance.getHostDataNode().internalEndPoint.equals(endPoint)) .map(instance -> instance.getId().toThrift()) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java index 5433b07c7c..f4b7664c57 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java @@ -91,7 +91,11 @@ public class StandaloneScheduler implements IScheduler { stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager); this.queryTerminator = new SimpleQueryTerminator( - executor, queryContext.getQueryId(), instances, internalServiceClientManager); + executor, + scheduledExecutor, + queryContext.getQueryId(), + instances, + internalServiceClientManager); } @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
