This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch RemoveTimeSliceAllocator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9fa2805b6b1457b16ff763e1cecaf15ef22868f9 Author: JackieTien97 <[email protected]> AuthorDate: Fri Nov 17 16:50:20 2023 +0800 Remove wrong TimeSliceAllocator --- .../execution/driver/DriverContext.java | 8 --- .../execution/operator/OperatorContext.java | 12 +++- .../execution/timer/ITimeSliceAllocator.java | 29 --------- .../timer/RuleBasedTimeSliceAllocator.java | 69 ---------------------- .../plan/planner/LocalExecutionPlanContext.java | 11 ---- .../plan/planner/OperatorTreeGenerator.java | 66 --------------------- 6 files changed, 9 insertions(+), 186 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java index 5623e8a3439..79cb4acf3b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java @@ -25,7 +25,6 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator; import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTaskId; -import org.apache.iotdb.db.queryengine.execution.timer.RuleBasedTimeSliceAllocator; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import java.util.ArrayList; @@ -39,7 +38,6 @@ public class DriverContext { private final FragmentInstanceContext fragmentInstanceContext; private final List<OperatorContext> operatorContexts = new ArrayList<>(); private ISink sink; - private final RuleBasedTimeSliceAllocator timeSliceAllocator; private int dependencyDriverIndex = -1; private ExchangeOperator downstreamOperator; @@ -50,13 +48,11 @@ public class DriverContext { @TestOnly public DriverContext() { this.fragmentInstanceContext = null; - this.timeSliceAllocator = null; } public DriverContext(FragmentInstanceContext fragmentInstanceContext, int pipelineId) { this.fragmentInstanceContext = fragmentInstanceContext; this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(), pipelineId); - this.timeSliceAllocator = new RuleBasedTimeSliceAllocator(); } public OperatorContext addOperatorContext( @@ -108,10 +104,6 @@ public class DriverContext { return operatorContexts; } - public RuleBasedTimeSliceAllocator getTimeSliceAllocator() { - return timeSliceAllocator; - } - public int getPipelineId() { return driverTaskID.getPipelineId(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java index 4a9b4c0a6ea..709e34fb0eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.execution.operator; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; @@ -28,6 +29,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import io.airlift.units.Duration; import java.util.Objects; +import java.util.concurrent.TimeUnit; /** * Contains information about {@link Operator} execution. @@ -36,12 +38,16 @@ import java.util.Objects; */ public class OperatorContext { + private static Duration maxRunTime = + new Duration( + IoTDBDescriptor.getInstance().getConfig().getDriverTaskExecutionTimeSliceInMs(), + TimeUnit.MILLISECONDS); + private final int operatorId; // It seems it's never used. private final PlanNodeId planNodeId; private final String operatorType; private DriverContext driverContext; - private Duration maxRunTime; private long totalExecutionTimeInNanos = 0L; private long nextCalledCount = 0L; @@ -90,8 +96,8 @@ public class OperatorContext { return maxRunTime; } - public void setMaxRunTime(Duration maxRunTime) { - this.maxRunTime = maxRunTime; + public static void setMaxRunTime(Duration maxRunTime) { + OperatorContext.maxRunTime = maxRunTime; } public SessionInfo getSessionInfo() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/ITimeSliceAllocator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/ITimeSliceAllocator.java deleted file mode 100644 index ef2788e957d..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/ITimeSliceAllocator.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.execution.timer; - -import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; - -import io.airlift.units.Duration; - -public interface ITimeSliceAllocator { - - Duration getMaxRunTime(OperatorContext operatorContext); -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/RuleBasedTimeSliceAllocator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/RuleBasedTimeSliceAllocator.java deleted file mode 100644 index e934ebba463..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/RuleBasedTimeSliceAllocator.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.execution.timer; - -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; - -import io.airlift.units.Duration; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Preconditions.checkState; - -public class RuleBasedTimeSliceAllocator implements ITimeSliceAllocator { - - private static final long EXECUTION_TIME_SLICE_IN_MS = - new Duration( - IoTDBDescriptor.getInstance().getConfig().getDriverTaskExecutionTimeSliceInMs(), - TimeUnit.MILLISECONDS) - .roundTo(TimeUnit.MILLISECONDS); - - private final Map<OperatorContext, Integer> operatorToWeightMap; - - private int totalWeight; - - public RuleBasedTimeSliceAllocator() { - this.operatorToWeightMap = new HashMap<>(); - this.totalWeight = 0; - } - - public void recordExecutionWeight(OperatorContext operatorContext, int weight) { - checkState( - !operatorToWeightMap.containsKey(operatorContext), "Same operator has been weighted"); - operatorToWeightMap.put(operatorContext, weight); - totalWeight += weight; - } - - private int getWeight(OperatorContext operatorContext) { - checkState( - operatorToWeightMap.containsKey(operatorContext), "This operator has not been weighted"); - return operatorToWeightMap.get(operatorContext); - } - - @Override - public Duration getMaxRunTime(OperatorContext operatorContext) { - return new Duration( - (double) EXECUTION_TIME_SLICE_IN_MS * getWeight(operatorContext) / totalWeight, - TimeUnit.MILLISECONDS); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java index fc190a7ae03..018789b64f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java @@ -28,7 +28,6 @@ import org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator; -import org.apache.iotdb.db.queryengine.execution.timer.RuleBasedTimeSliceAllocator; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; @@ -134,12 +133,6 @@ public class LocalExecutionPlanContext { public void addPipelineDriverFactory( Operator operation, DriverContext driverContext, long estimatedMemorySize) { - driverContext - .getOperatorContexts() - .forEach( - operatorContext -> - operatorContext.setMaxRunTime( - driverContext.getTimeSliceAllocator().getMaxRunTime(operatorContext))); pipelineDriverFactories.add( new PipelineDriverFactory(operation, driverContext, estimatedMemorySize)); } @@ -264,10 +257,6 @@ public class LocalExecutionPlanContext { return typeProvider; } - public RuleBasedTimeSliceAllocator getTimeSliceAllocator() { - return driverContext.getTimeSliceAllocator(); - } - public FragmentInstanceContext getInstanceContext() { return driverContext.getFragmentInstanceContext(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index bc70aff0fe9..b97dee886e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -323,7 +323,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP ((DataDriverContext) context.getDriverContext()).addSourceOperator(seriesScanOperator); ((DataDriverContext) context.getDriverContext()).addPath(seriesPath); context.getDriverContext().setInputDriver(true); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return seriesScanOperator; } @@ -363,9 +362,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP ((DataDriverContext) context.getDriverContext()).addSourceOperator(seriesScanOperator); ((DataDriverContext) context.getDriverContext()).addPath(seriesPath); context.getDriverContext().setInputDriver(true); - context - .getTimeSliceAllocator() - .recordExecutionWeight(operatorContext, seriesPath.getColumnNum()); return seriesScanOperator; } @@ -429,7 +425,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP ((DataDriverContext) context.getDriverContext()).addSourceOperator(aggregateScanOperator); ((DataDriverContext) context.getDriverContext()).addPath(seriesPath); context.getDriverContext().setInputDriver(true); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size()); return aggregateScanOperator; } @@ -522,7 +517,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP .addSourceOperator(seriesAggregationScanOperator); ((DataDriverContext) context.getDriverContext()).addPath(seriesPath); context.getDriverContext().setInputDriver(true); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size()); return seriesAggregationScanOperator; } @@ -540,7 +534,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getPlanNodeId(), SchemaQueryOrderByHeatOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new SchemaQueryOrderByHeatOperator(operatorContext, children); } @@ -577,7 +570,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), SchemaQueryScanOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new SchemaQueryScanOperator<>( node.getPlanNodeId(), operatorContext, @@ -601,7 +593,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), SchemaQueryScanOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new SchemaQueryScanOperator<>( node.getPlanNodeId(), operatorContext, @@ -626,7 +617,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), SchemaQueryMergeOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new SchemaQueryMergeOperator(operatorContext, children); } @@ -640,7 +630,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), CountMergeOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); if (node.getChildren().get(0) instanceof LevelTimeSeriesCountNode) { return new CountGroupByLevelMergeOperator(operatorContext, children); } else { @@ -657,7 +646,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), SchemaCountOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new SchemaCountOperator<>( node.getPlanNodeId(), operatorContext, @@ -675,7 +663,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), SchemaCountOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new SchemaCountOperator<>( node.getPlanNodeId(), operatorContext, @@ -697,7 +684,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), CountGroupByLevelScanOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new CountGroupByLevelScanOperator<>( node.getPlanNodeId(), operatorContext, @@ -720,7 +706,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), SchemaQueryScanOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new SchemaQueryScanOperator<>( node.getPlanNodeId(), operatorContext, @@ -739,7 +724,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), NodeManageMemoryMergeOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new NodeManageMemoryMergeOperator(operatorContext, node.getData(), child); } @@ -754,7 +738,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), NodePathsConvertOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new NodePathsConvertOperator(operatorContext, child); } @@ -768,7 +751,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), NodePathsCountOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new NodePathsCountOperator(operatorContext, child); } @@ -791,7 +773,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP if (outputColumnTypes == null || outputColumnTypes.isEmpty()) { throw new IllegalStateException("OutputColumTypes should not be null/empty"); } - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new SingleDeviceViewOperator( operatorContext, node.getDevice(), child, deviceColumnIndex, outputColumnTypes); } @@ -812,7 +793,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP .collect(Collectors.toList()); List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new DeviceViewOperator( operatorContext, node.getDevices(), children, deviceColumnIndex, outputColumnTypes); } @@ -830,7 +810,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.setCachedDataTypes(dataTypes); List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context); List<SortItem> sortItemList = node.getMergeOrderParameter().getSortItemList(); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size()); List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemList.size()); @@ -860,7 +839,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.setCachedDataTypes(dataTypes); List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context); List<SortItem> sortItemList = node.getMergeOrderParameter().getSortItemList(); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size()); List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemList.size()); @@ -936,11 +914,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP switch (fillPolicy) { case VALUE: Literal literal = descriptor.getFillValue(); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new FillOperator( operatorContext, getConstantFill(inputColumns, inputDataTypes, literal), child); case PREVIOUS: - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new FillOperator( operatorContext, getPreviousFill( @@ -955,7 +931,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP .getZoneId())), child); case LINEAR: - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new LinearFillOperator( operatorContext, getLinearFill(inputColumns, inputDataTypes), child); default: @@ -1111,8 +1086,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP ExpressionTypeAnalyzer.analyzeExpression(expressionTypes, projectExpression); } - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); - boolean hasNonMappableUDF = false; for (Expression expression : projectExpressions) { if (!expression.isMappable(expressionTypes)) { @@ -1203,7 +1176,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), FilterAndProjectOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); for (Expression projectExpression : projectExpressions) { ExpressionTypeAnalyzer.analyzeExpression(expressionTypes, projectExpression); @@ -1305,7 +1277,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), TransformOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(transformContext, 1); return new TransformOperator( transformContext, filter, @@ -1365,7 +1336,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP calculateMaxAggregationResultSize( aggregationDescriptors, timeRangeIterator, context.getTypeProvider()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size()); return new AggregationOperator( operatorContext, aggregators, timeRangeIterator, children, maxReturnSize); } @@ -1428,7 +1398,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), TagAggregationOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregatorCount); long maxReturnSize = calculateMaxAggregationResultSize( aggregationDescriptors, timeRangeIterator, context.getTypeProvider()); @@ -1478,7 +1447,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP calculateMaxAggregationResultSize( aggregationDescriptors, timeRangeIterator, context.getTypeProvider()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size()); return new SlidingWindowAggregationOperator( operatorContext, aggregators, @@ -1500,7 +1468,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getPlanNodeId(), LimitOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new LimitOperator(operatorContext, node.getLimit(), child); } @@ -1515,7 +1482,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getPlanNodeId(), OffsetOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new OffsetOperator(operatorContext, node.getOffset(), child); } @@ -1558,7 +1524,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), RawDataAggregationOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size()); ITimeRangeIterator timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true); @@ -1660,7 +1625,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP calculateMaxAggregationResultSize( aggregationDescriptors, timeRangeIterator, context.getTypeProvider()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size()); return new AggregationOperator( operatorContext, aggregators, timeRangeIterator, children, maxReturnSize); } @@ -1702,7 +1666,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider()); List<SortItem> sortItemList = node.getOrderByParameter().getSortItemList(); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size()); List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemList.size()); @@ -1760,8 +1723,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP intoPathDescriptor.getTargetPathToDataTypeMap(); long statementSizePerLine = calculateStatementSizePerLine(targetPathToDataTypeMap); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); - List<Pair<String, PartialPath>> sourceTargetPathPairList = intoPathDescriptor.getSourceTargetPathPairList(); List<String> sourceColumnToViewList = intoPathDescriptor.getSourceColumnToViewList(); @@ -1824,7 +1785,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP calculateStatementSizePerLine(deviceToTargetPathDataTypeMap.get(sourceDevice)); } - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new DeviceViewIntoOperator( operatorContext, child, @@ -1913,7 +1873,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP List<ColumnMerger> mergers = createColumnMergers(outputColumns, timeComparator); List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new RowBasedTimeJoinOperator( operatorContext, children, @@ -1936,7 +1895,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP HorizontallyConcatOperator.class.getSimpleName()); List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new HorizontallyConcatOperator(operatorContext, children, outputColumnTypes); } @@ -1950,8 +1908,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getPlanNodeId(), ShowQueriesOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); - return new ShowQueriesOperator( operatorContext, node.getPlanNodeId(), Coordinator.getInstance()); } @@ -1994,7 +1950,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), ExchangeOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 0); FragmentInstanceId localInstanceId = context.getInstanceContext().getId(); FragmentInstanceId remoteInstanceId = node.getUpstreamInstanceId(); @@ -2037,7 +1992,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), IdentitySinkOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); checkArgument( MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null"); @@ -2068,7 +2022,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), ShuffleHelperOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); // TODO implement pipeline division for shuffle sink context.setDegreeOfParallelism(1); @@ -2103,7 +2056,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), SchemaFetchMergeOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new SchemaFetchMergeOperator(operatorContext, children, node.getStorageGroupList()); } @@ -2117,7 +2069,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), SchemaFetchScanOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new SchemaFetchScanOperator( node.getPlanNodeId(), operatorContext, @@ -2177,7 +2128,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), UpdateLastCacheOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new UpdateLastCacheOperator( operatorContext, lastQueryScan, @@ -2194,7 +2144,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), UpdateViewPathLastCacheOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new UpdateViewPathLastCacheOperator( operatorContext, lastQueryScan, @@ -2220,7 +2169,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), AlignedUpdateLastCacheOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new AlignedUpdateLastCacheOperator( operatorContext, lastQueryScan, @@ -2236,7 +2184,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), AlignedUpdateViewPathLastCacheOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new AlignedUpdateViewPathLastCacheOperator( operatorContext, lastQueryScan, @@ -2283,7 +2230,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP ((DataDriverContext) context.getDriverContext()) .addSourceOperator(seriesAggregationScanOperator); ((DataDriverContext) context.getDriverContext()).addPath(seriesPath); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size()); return seriesAggregationScanOperator; } @@ -2326,7 +2272,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP ((DataDriverContext) context.getDriverContext()) .addSourceOperator(seriesAggregationScanOperator); ((DataDriverContext) context.getDriverContext()).addPath(unCachedPath); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, aggregators.size()); return seriesAggregationScanOperator; } @@ -2423,7 +2368,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), LastQueryOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new LastQueryOperator(operatorContext, operatorList, builder); } else { // order by timeseries @@ -2454,7 +2398,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), LastQuerySortOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new LastQuerySortOperator(operatorContext, builder.build(), operatorList, comparator); } } @@ -2479,7 +2422,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP ? ASC_BINARY_COMPARATOR : DESC_BINARY_COMPARATOR; - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new LastQueryMergeOperator(operatorContext, children, comparator); } @@ -2495,7 +2437,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getPlanNodeId(), LastQueryCollectOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new LastQueryCollectOperator(operatorContext, children); } @@ -2511,7 +2452,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getPlanNodeId(), LastQueryCollectOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new LastQueryTransformOperator( node.getViewPath(), node.getDataType(), operatorContext, operator); } @@ -2567,7 +2507,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), SchemaQueryScanOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new SchemaQueryScanOperator<>( node.getPlanNodeId(), operatorContext, @@ -2584,7 +2523,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), SchemaQueryScanOperator.class.getSimpleName()); - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new SchemaQueryScanOperator<>( node.getPlanNodeId(), operatorContext, @@ -2761,7 +2699,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP childNode.getPlanNodeId(), childOperation.calculateMaxReturnSize()); - context.getTimeSliceAllocator().recordExecutionWeight(sourceOperator.getOperatorContext(), 1); context.addExchangeOperator(sourceOperator); return sourceOperator; } @@ -2846,9 +2783,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP childNode.getPlanNodeId(), childOperation.calculateMaxReturnSize()); context.getCurrentPipelineDriverFactory().setDownstreamOperator(sourceOperator); - context - .getTimeSliceAllocator() - .recordExecutionWeight(sourceOperator.getOperatorContext(), 1); parentPipelineChildren.add(sourceOperator); context.addExchangeOperator(sourceOperator); int childExchangeNum = subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1;
