This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/write_opt_version1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e84d20e81b33d5bb28951488bd5a95488b88afc3 Author: Jinrui.Zhang <[email protected]> AuthorDate: Mon Jun 20 20:29:20 2022 +0800 stable version but has getStatus() issue --- .../org/apache/iotdb/db/mpp/plan/Coordinator.java | 48 ++++---- .../org/apache/iotdb/db/mpp/plan/StepTracker.java | 73 ++++++++++++ .../db/mpp/plan/execution/QueryExecution.java | 130 ++++++++++++--------- .../planner/distribution/DistributionPlanner.java | 3 - .../db/mpp/plan/scheduler/ClusterScheduler.java | 4 +- .../db/mpp/plan/scheduler/StandaloneScheduler.java | 1 - 6 files changed, 176 insertions(+), 83 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java index 111a9222a6..495af1f3e0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java @@ -109,29 +109,33 @@ public class Coordinator { String sql, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) { - - QueryId globalQueryId = queryIdGenerator.createNextQueryId(); - try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) { - if (sql != null) { - LOGGER.info("start executing sql: {}", sql); - } - IQueryExecution execution = - createQueryExecution( - statement, - new MPPQueryContext( - sql, - globalQueryId, - session, - DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT, - DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT), - partitionFetcher, - schemaFetcher); - if (execution.isQuery()) { - queryExecutionMap.put(queryId, execution); + long startTime = System.nanoTime(); + try { + QueryId globalQueryId = queryIdGenerator.createNextQueryId(); + try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) { + // if (sql != null) { + // LOGGER.info("start executing sql: {}", sql); + // } + IQueryExecution execution = + createQueryExecution( + statement, + new MPPQueryContext( + sql, + globalQueryId, + session, + DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT, + DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT), + partitionFetcher, + schemaFetcher); + if (execution.isQuery()) { + queryExecutionMap.put(queryId, execution); + } + execution.start(); + + return execution.getStatus(); } - execution.start(); - - return execution.getStatus(); + } finally { + StepTracker.trace("statementExec", startTime, System.nanoTime()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java new file mode 100644 index 0000000000..1fe7df7be0 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.plan; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +class Metric { + private static final Logger logger = LoggerFactory.getLogger(Metric.class); + public String stepName; + public long invokeCount; + public long totalTime; + public long last100Time; + + public Metric(String stepName) { + this.stepName = stepName; + this.invokeCount = 0; + this.totalTime = 0; + this.last100Time = 0; + } + + public void trace(long startTime, long endTime) { + this.invokeCount++; + this.totalTime += (endTime - startTime); + } + + public void tryPrint() { + if (invokeCount % 100 == 0) { + logger.info( + String.format( + "step metrics [%d]-[%s] - Total: %d, SUM: %.2fms, AVG: %fms, Last100AVG: %fms", + Thread.currentThread().getId(), + stepName, + invokeCount, + totalTime * 1.0 / 1000000, + totalTime * 1.0 / 1000000 / invokeCount, + (totalTime * 1.0 - last100Time) / 1000000 / 100)); + last100Time = totalTime; + } + } +} + +public class StepTracker { + private static final ThreadLocal<Map<String, Metric>> metrics = new ThreadLocal<>(); + + public static void trace(String stepName, long startTime, long endTime) { + if (metrics.get() == null) { + metrics.set(new HashMap<>()); + } + metrics.get().computeIfAbsent(stepName, Metric::new).trace(startTime, endTime); + metrics.get().get(stepName).tryPrint(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 5f4ce1ae3f..a7a39564ef 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.execution.QueryState; import org.apache.iotdb.db.mpp.execution.QueryStateMachine; import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockService; import org.apache.iotdb.db.mpp.execution.datatransfer.ISourceHandle; +import org.apache.iotdb.db.mpp.plan.StepTracker; import org.apache.iotdb.db.mpp.plan.analyze.Analysis; import org.apache.iotdb.db.mpp.plan.analyze.Analyzer; import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher; @@ -46,7 +47,6 @@ import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner; import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan; -import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil; import org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler; import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler; import org.apache.iotdb.db.mpp.plan.scheduler.StandaloneScheduler; @@ -113,6 +113,8 @@ public class QueryExecution implements IQueryExecution { private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager; + private long queryExecutionStartTime; + public QueryExecution( Statement statement, MPPQueryContext context, @@ -122,12 +124,15 @@ public class QueryExecution implements IQueryExecution { IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher, IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) { + this.queryExecutionStartTime = System.nanoTime(); this.executor = executor; this.writeOperationExecutor = writeOperationExecutor; this.scheduledExecutor = scheduledExecutor; this.context = context; this.planOptimizers = new ArrayList<>(); + long startTime = System.nanoTime(); this.analysis = analyze(statement, context, partitionFetcher, schemaFetcher); + StepTracker.trace("analyze", startTime, System.nanoTime()); this.stateMachine = new QueryStateMachine(context.getQueryId(), executor); this.partitionFetcher = partitionFetcher; this.schemaFetcher = schemaFetcher; @@ -189,52 +194,59 @@ public class QueryExecution implements IQueryExecution { IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) { // initialize the variable `analysis` - logger.info("start to analyze query"); + // logger.info("start to analyze query"); return new Analyzer(context, partitionFetcher, schemaFetcher).analyze(statement); } private void schedule() { - // TODO: (xingtanzjr) initialize the query scheduler according to configuration - this.scheduler = - config.isClusterMode() - ? new ClusterScheduler( - context, - stateMachine, - distributedPlan.getInstances(), - context.getQueryType(), - executor, - writeOperationExecutor, - scheduledExecutor, - internalServiceClientManager) - : new StandaloneScheduler( - context, - stateMachine, - distributedPlan.getInstances(), - context.getQueryType(), - executor, - scheduledExecutor, - internalServiceClientManager); - this.scheduler.start(); + long startTime = System.nanoTime(); + try { + // TODO: (xingtanzjr) initialize the query scheduler according to configuration + this.scheduler = + config.isClusterMode() + ? new ClusterScheduler( + context, + stateMachine, + distributedPlan.getInstances(), + context.getQueryType(), + executor, + writeOperationExecutor, + scheduledExecutor, + internalServiceClientManager) + : new StandaloneScheduler( + context, + stateMachine, + distributedPlan.getInstances(), + context.getQueryType(), + executor, + scheduledExecutor, + internalServiceClientManager); + this.scheduler.start(); + } finally { + StepTracker.trace("dispatch", startTime, System.nanoTime()); + } } // Use LogicalPlanner to do the logical query plan and logical optimization public void doLogicalPlan() { - logger.info("do logical plan..."); - LogicalPlanner planner = new LogicalPlanner(this.context, this.planOptimizers); - this.logicalPlan = planner.plan(this.analysis); - logger.info( - "logical plan is: \n {}", PlanNodeUtil.nodeToString(this.logicalPlan.getRootNode())); + long startTime = System.nanoTime(); + try { + LogicalPlanner planner = new LogicalPlanner(this.context, this.planOptimizers); + this.logicalPlan = planner.plan(this.analysis); + } finally { + StepTracker.trace("doLogicalPlan", startTime, System.nanoTime()); + } } // Generate the distributed plan and split it into fragments public void doDistributedPlan() { - logger.info("do distribution plan..."); - DistributionPlanner planner = new DistributionPlanner(this.analysis, this.logicalPlan); - this.distributedPlan = planner.planFragments(); - logger.info( - "distribution plan done. Fragment instance count is {}, details is: \n {}", - distributedPlan.getInstances().size(), - printFragmentInstances(distributedPlan.getInstances())); + long startTime = System.nanoTime(); + try { + DistributionPlanner planner = new DistributionPlanner(this.analysis, this.logicalPlan); + this.distributedPlan = planner.planFragments(); + } finally { + StepTracker.trace("doDistributionPlan", startTime, System.nanoTime()); + } } private String printFragmentInstances(List<FragmentInstance> instances) { @@ -332,28 +344,36 @@ public class QueryExecution implements IQueryExecution { * @return ExecutionStatus. Contains the QueryId and the TSStatus. */ public ExecutionResult getStatus() { - // Although we monitor the state to transition to RUNNING, the future will return if any - // Terminated state is triggered - SettableFuture<QueryState> future = SettableFuture.create(); - stateMachine.addStateChangeListener( - state -> { - if (state == QueryState.RUNNING || state.isDone()) { - future.set(state); - } - }); - + long startTime = System.nanoTime(); try { - QueryState state = future.get(); - // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED - return getExecutionResult(state); - } catch (InterruptedException | ExecutionException e) { - // TODO: (xingtanzjr) use more accurate error handling - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); + // Although we monitor the state to transition to RUNNING, the future will return if any + // Terminated state is triggered + try { + SettableFuture<QueryState> future = SettableFuture.create(); + stateMachine.addStateChangeListener( + state -> { + if (state == QueryState.RUNNING || state.isDone()) { + future.set(state); + } + }); + long startTimeGet = System.nanoTime(); + QueryState state = future.get(); + StepTracker.trace("get()", startTimeGet, System.nanoTime()); + // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED + return getExecutionResult(state); + } catch (InterruptedException | ExecutionException e) { + // TODO: (xingtanzjr) use more accurate error handling + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + return new ExecutionResult( + context.getQueryId(), + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, stateMachine.getFailureMessage())); } - return new ExecutionResult( - context.getQueryId(), - RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, stateMachine.getFailureMessage())); + } finally { + StepTracker.trace("getStatus", startTime, System.nanoTime()); + StepTracker.trace("QueryExecutionLifeCycle", this.queryExecutionStartTime, System.nanoTime()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java index 421de275da..4dfe1b968f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java @@ -29,7 +29,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; -import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil; import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode; @@ -67,9 +66,7 @@ public class DistributionPlanner { public DistributedQueryPlan planFragments() { PlanNode rootAfterRewrite = rewriteSource(); - System.out.println(PlanNodeUtil.nodeToString(rootAfterRewrite)); PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite); - System.out.println(PlanNodeUtil.nodeToString(rootWithExchange)); if (analysis.getStatement() instanceof QueryStatement) { analysis .getRespDatasetHeader() diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java index 9437c22536..8390ac6fe5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java @@ -86,7 +86,7 @@ public class ClusterScheduler implements IScheduler { @Override public void start() { stateMachine.transitionToDispatching(); - logger.info("transit to DISPATCHING"); + // logger.info("transit to DISPATCHING"); Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances); // NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect. @@ -116,7 +116,7 @@ public class ClusterScheduler implements IScheduler { // The FragmentInstances has been dispatched successfully to corresponding host, we mark the // QueryState to Running stateMachine.transitionToRunning(); - logger.info("transit to RUNNING"); + // logger.info("transit to RUNNING"); instances.forEach( instance -> { stateMachine.initialFragInstanceState(instance.getId(), FragmentInstanceState.RUNNING); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java index 89bacfb562..5433b07c7c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java @@ -98,7 +98,6 @@ public class StandaloneScheduler implements IScheduler { @Override public void start() { stateMachine.transitionToDispatching(); - LOGGER.info("{} transit to DISPATCHING", getLogHeader()); // For the FragmentInstance of WRITE, it will be executed directly when dispatching. // TODO: Other QueryTypes switch (queryType) {
