This is an automated email from the ASF dual-hosted git repository.
lancelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0acd405bc0c Remove wrong TimeSliceAllocator (#11569)
0acd405bc0c is described below
commit 0acd405bc0ca9515ec1e8273ec1105f743fa91ba
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Nov 21 10:32:26 2023 +0800
Remove wrong TimeSliceAllocator (#11569)
---
.../execution/driver/DriverContext.java | 8 ---
.../execution/operator/OperatorContext.java | 12 +++-
.../execution/timer/ITimeSliceAllocator.java | 29 ---------
.../timer/RuleBasedTimeSliceAllocator.java | 69 ----------------------
.../plan/planner/LocalExecutionPlanContext.java | 11 ----
.../plan/planner/OperatorTreeGenerator.java | 66 ---------------------
6 files changed, 9 insertions(+), 186 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
index 5623e8a3439..79cb4acf3b9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
@@ -25,7 +25,6 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTaskId;
-import
org.apache.iotdb.db.queryengine.execution.timer.RuleBasedTimeSliceAllocator;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import java.util.ArrayList;
@@ -39,7 +38,6 @@ public class DriverContext {
private final FragmentInstanceContext fragmentInstanceContext;
private final List<OperatorContext> operatorContexts = new ArrayList<>();
private ISink sink;
- private final RuleBasedTimeSliceAllocator timeSliceAllocator;
private int dependencyDriverIndex = -1;
private ExchangeOperator downstreamOperator;
@@ -50,13 +48,11 @@ public class DriverContext {
@TestOnly
public DriverContext() {
this.fragmentInstanceContext = null;
- this.timeSliceAllocator = null;
}
public DriverContext(FragmentInstanceContext fragmentInstanceContext, int
pipelineId) {
this.fragmentInstanceContext = fragmentInstanceContext;
this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(),
pipelineId);
- this.timeSliceAllocator = new RuleBasedTimeSliceAllocator();
}
public OperatorContext addOperatorContext(
@@ -108,10 +104,6 @@ public class DriverContext {
return operatorContexts;
}
- public RuleBasedTimeSliceAllocator getTimeSliceAllocator() {
- return timeSliceAllocator;
- }
-
public int getPipelineId() {
return driverTaskID.getPipelineId();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
index 4a9b4c0a6ea..709e34fb0eb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.execution.operator;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
@@ -28,6 +29,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import io.airlift.units.Duration;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
/**
* Contains information about {@link Operator} execution.
@@ -36,12 +38,16 @@ import java.util.Objects;
*/
public class OperatorContext {
+ private static Duration maxRunTime =
+ new Duration(
+
IoTDBDescriptor.getInstance().getConfig().getDriverTaskExecutionTimeSliceInMs(),
+ TimeUnit.MILLISECONDS);
+
private final int operatorId;
// It seems it's never used.
private final PlanNodeId planNodeId;
private final String operatorType;
private DriverContext driverContext;
- private Duration maxRunTime;
private long totalExecutionTimeInNanos = 0L;
private long nextCalledCount = 0L;
@@ -90,8 +96,8 @@ public class OperatorContext {
return maxRunTime;
}
- public void setMaxRunTime(Duration maxRunTime) {
- this.maxRunTime = maxRunTime;
+ public static void setMaxRunTime(Duration maxRunTime) {
+ OperatorContext.maxRunTime = maxRunTime;
}
public SessionInfo getSessionInfo() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/ITimeSliceAllocator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/ITimeSliceAllocator.java
deleted file mode 100644
index ef2788e957d..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/ITimeSliceAllocator.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.execution.timer;
-
-import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-
-import io.airlift.units.Duration;
-
-public interface ITimeSliceAllocator {
-
- Duration getMaxRunTime(OperatorContext operatorContext);
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/RuleBasedTimeSliceAllocator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/RuleBasedTimeSliceAllocator.java
deleted file mode 100644
index e934ebba463..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/RuleBasedTimeSliceAllocator.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.execution.timer;
-
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-
-import io.airlift.units.Duration;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkState;
-
-public class RuleBasedTimeSliceAllocator implements ITimeSliceAllocator {
-
- private static final long EXECUTION_TIME_SLICE_IN_MS =
- new Duration(
-
IoTDBDescriptor.getInstance().getConfig().getDriverTaskExecutionTimeSliceInMs(),
- TimeUnit.MILLISECONDS)
- .roundTo(TimeUnit.MILLISECONDS);
-
- private final Map<OperatorContext, Integer> operatorToWeightMap;
-
- private int totalWeight;
-
- public RuleBasedTimeSliceAllocator() {
- this.operatorToWeightMap = new HashMap<>();
- this.totalWeight = 0;
- }
-
- public void recordExecutionWeight(OperatorContext operatorContext, int
weight) {
- checkState(
- !operatorToWeightMap.containsKey(operatorContext), "Same operator has
been weighted");
- operatorToWeightMap.put(operatorContext, weight);
- totalWeight += weight;
- }
-
- private int getWeight(OperatorContext operatorContext) {
- checkState(
- operatorToWeightMap.containsKey(operatorContext), "This operator has
not been weighted");
- return operatorToWeightMap.get(operatorContext);
- }
-
- @Override
- public Duration getMaxRunTime(OperatorContext operatorContext) {
- return new Duration(
- (double) EXECUTION_TIME_SLICE_IN_MS * getWeight(operatorContext) /
totalWeight,
- TimeUnit.MILLISECONDS);
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
index fc190a7ae03..018789b64f4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
@@ -28,7 +28,6 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
-import
org.apache.iotdb.db.queryengine.execution.timer.RuleBasedTimeSliceAllocator;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -134,12 +133,6 @@ public class LocalExecutionPlanContext {
public void addPipelineDriverFactory(
Operator operation, DriverContext driverContext, long
estimatedMemorySize) {
- driverContext
- .getOperatorContexts()
- .forEach(
- operatorContext ->
- operatorContext.setMaxRunTime(
-
driverContext.getTimeSliceAllocator().getMaxRunTime(operatorContext)));
pipelineDriverFactories.add(
new PipelineDriverFactory(operation, driverContext,
estimatedMemorySize));
}
@@ -264,10 +257,6 @@ public class LocalExecutionPlanContext {
return typeProvider;
}
- public RuleBasedTimeSliceAllocator getTimeSliceAllocator() {
- return driverContext.getTimeSliceAllocator();
- }
-
public FragmentInstanceContext getInstanceContext() {
return driverContext.getFragmentInstanceContext();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index bc70aff0fe9..b97dee886e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -323,7 +323,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
((DataDriverContext)
context.getDriverContext()).addSourceOperator(seriesScanOperator);
((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
context.getDriverContext().setInputDriver(true);
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return seriesScanOperator;
}
@@ -363,9 +362,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
((DataDriverContext)
context.getDriverContext()).addSourceOperator(seriesScanOperator);
((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
context.getDriverContext().setInputDriver(true);
- context
- .getTimeSliceAllocator()
- .recordExecutionWeight(operatorContext, seriesPath.getColumnNum());
return seriesScanOperator;
}
@@ -429,7 +425,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
((DataDriverContext)
context.getDriverContext()).addSourceOperator(aggregateScanOperator);
((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
context.getDriverContext().setInputDriver(true);
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
return aggregateScanOperator;
}
@@ -522,7 +517,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
.addSourceOperator(seriesAggregationScanOperator);
((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
context.getDriverContext().setInputDriver(true);
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
return seriesAggregationScanOperator;
}
@@ -540,7 +534,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
node.getPlanNodeId(),
SchemaQueryOrderByHeatOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new SchemaQueryOrderByHeatOperator(operatorContext, children);
}
@@ -577,7 +570,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaQueryScanOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new SchemaQueryScanOperator<>(
node.getPlanNodeId(),
operatorContext,
@@ -601,7 +593,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaQueryScanOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new SchemaQueryScanOperator<>(
node.getPlanNodeId(),
operatorContext,
@@ -626,7 +617,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaQueryMergeOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new SchemaQueryMergeOperator(operatorContext, children);
}
@@ -640,7 +630,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
CountMergeOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
if (node.getChildren().get(0) instanceof LevelTimeSeriesCountNode) {
return new CountGroupByLevelMergeOperator(operatorContext, children);
} else {
@@ -657,7 +646,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaCountOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new SchemaCountOperator<>(
node.getPlanNodeId(),
operatorContext,
@@ -675,7 +663,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaCountOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new SchemaCountOperator<>(
node.getPlanNodeId(),
operatorContext,
@@ -697,7 +684,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
CountGroupByLevelScanOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new CountGroupByLevelScanOperator<>(
node.getPlanNodeId(),
operatorContext,
@@ -720,7 +706,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaQueryScanOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new SchemaQueryScanOperator<>(
node.getPlanNodeId(),
operatorContext,
@@ -739,7 +724,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
NodeManageMemoryMergeOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new NodeManageMemoryMergeOperator(operatorContext, node.getData(),
child);
}
@@ -754,7 +738,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
NodePathsConvertOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new NodePathsConvertOperator(operatorContext, child);
}
@@ -768,7 +751,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
NodePathsCountOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new NodePathsCountOperator(operatorContext, child);
}
@@ -791,7 +773,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
if (outputColumnTypes == null || outputColumnTypes.isEmpty()) {
throw new IllegalStateException("OutputColumTypes should not be
null/empty");
}
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new SingleDeviceViewOperator(
operatorContext, node.getDevice(), child, deviceColumnIndex,
outputColumnTypes);
}
@@ -812,7 +793,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
.collect(Collectors.toList());
List<TSDataType> outputColumnTypes = getOutputColumnTypes(node,
context.getTypeProvider());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new DeviceViewOperator(
operatorContext, node.getDevices(), children, deviceColumnIndex,
outputColumnTypes);
}
@@ -830,7 +810,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.setCachedDataTypes(dataTypes);
List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node,
context);
List<SortItem> sortItemList =
node.getMergeOrderParameter().getSortItemList();
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
List<TSDataType> sortItemDataTypeList = new
ArrayList<>(sortItemList.size());
@@ -860,7 +839,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.setCachedDataTypes(dataTypes);
List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node,
context);
List<SortItem> sortItemList =
node.getMergeOrderParameter().getSortItemList();
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
List<TSDataType> sortItemDataTypeList = new
ArrayList<>(sortItemList.size());
@@ -936,11 +914,9 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
switch (fillPolicy) {
case VALUE:
Literal literal = descriptor.getFillValue();
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
1);
return new FillOperator(
operatorContext, getConstantFill(inputColumns, inputDataTypes,
literal), child);
case PREVIOUS:
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
1);
return new FillOperator(
operatorContext,
getPreviousFill(
@@ -955,7 +931,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
.getZoneId())),
child);
case LINEAR:
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
1);
return new LinearFillOperator(
operatorContext, getLinearFill(inputColumns, inputDataTypes),
child);
default:
@@ -1111,8 +1086,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
ExpressionTypeAnalyzer.analyzeExpression(expressionTypes,
projectExpression);
}
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-
boolean hasNonMappableUDF = false;
for (Expression expression : projectExpressions) {
if (!expression.isMappable(expressionTypes)) {
@@ -1203,7 +1176,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
FilterAndProjectOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
for (Expression projectExpression : projectExpressions) {
ExpressionTypeAnalyzer.analyzeExpression(expressionTypes,
projectExpression);
@@ -1305,7 +1277,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
TransformOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(transformContext,
1);
return new TransformOperator(
transformContext,
filter,
@@ -1365,7 +1336,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
calculateMaxAggregationResultSize(
aggregationDescriptors, timeRangeIterator,
context.getTypeProvider());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
return new AggregationOperator(
operatorContext, aggregators, timeRangeIterator, children,
maxReturnSize);
}
@@ -1428,7 +1398,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
TagAggregationOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregatorCount);
long maxReturnSize =
calculateMaxAggregationResultSize(
aggregationDescriptors, timeRangeIterator,
context.getTypeProvider());
@@ -1478,7 +1447,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
calculateMaxAggregationResultSize(
aggregationDescriptors, timeRangeIterator,
context.getTypeProvider());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
return new SlidingWindowAggregationOperator(
operatorContext,
aggregators,
@@ -1500,7 +1468,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
node.getPlanNodeId(),
LimitOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new LimitOperator(operatorContext, node.getLimit(), child);
}
@@ -1515,7 +1482,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
node.getPlanNodeId(),
OffsetOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new OffsetOperator(operatorContext, node.getOffset(), child);
}
@@ -1558,7 +1524,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
RawDataAggregationOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, ascending, true);
@@ -1660,7 +1625,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
calculateMaxAggregationResultSize(
aggregationDescriptors, timeRangeIterator,
context.getTypeProvider());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
return new AggregationOperator(
operatorContext, aggregators, timeRangeIterator, children,
maxReturnSize);
}
@@ -1702,7 +1666,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
List<TSDataType> dataTypes = getOutputColumnTypes(node,
context.getTypeProvider());
List<SortItem> sortItemList = node.getOrderByParameter().getSortItemList();
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
List<TSDataType> sortItemDataTypeList = new
ArrayList<>(sortItemList.size());
@@ -1760,8 +1723,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
intoPathDescriptor.getTargetPathToDataTypeMap();
long statementSizePerLine =
calculateStatementSizePerLine(targetPathToDataTypeMap);
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-
List<Pair<String, PartialPath>> sourceTargetPathPairList =
intoPathDescriptor.getSourceTargetPathPairList();
List<String> sourceColumnToViewList =
intoPathDescriptor.getSourceColumnToViewList();
@@ -1824,7 +1785,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
calculateStatementSizePerLine(deviceToTargetPathDataTypeMap.get(sourceDevice));
}
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new DeviceViewIntoOperator(
operatorContext,
child,
@@ -1913,7 +1873,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
List<ColumnMerger> mergers = createColumnMergers(outputColumns,
timeComparator);
List<TSDataType> outputColumnTypes = getOutputColumnTypes(node,
context.getTypeProvider());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new RowBasedTimeJoinOperator(
operatorContext,
children,
@@ -1936,7 +1895,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
HorizontallyConcatOperator.class.getSimpleName());
List<TSDataType> outputColumnTypes = getOutputColumnTypes(node,
context.getTypeProvider());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new HorizontallyConcatOperator(operatorContext, children,
outputColumnTypes);
}
@@ -1950,8 +1908,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
node.getPlanNodeId(),
ShowQueriesOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-
return new ShowQueriesOperator(
operatorContext, node.getPlanNodeId(), Coordinator.getInstance());
}
@@ -1994,7 +1950,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
ExchangeOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 0);
FragmentInstanceId localInstanceId = context.getInstanceContext().getId();
FragmentInstanceId remoteInstanceId = node.getUpstreamInstanceId();
@@ -2037,7 +1992,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
IdentitySinkOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
checkArgument(
MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should
not be null");
@@ -2068,7 +2022,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
ShuffleHelperOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
// TODO implement pipeline division for shuffle sink
context.setDegreeOfParallelism(1);
@@ -2103,7 +2056,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaFetchMergeOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new SchemaFetchMergeOperator(operatorContext, children,
node.getStorageGroupList());
}
@@ -2117,7 +2069,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaFetchScanOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new SchemaFetchScanOperator(
node.getPlanNodeId(),
operatorContext,
@@ -2177,7 +2128,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
UpdateLastCacheOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
1);
return new UpdateLastCacheOperator(
operatorContext,
lastQueryScan,
@@ -2194,7 +2144,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
UpdateViewPathLastCacheOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
1);
return new UpdateViewPathLastCacheOperator(
operatorContext,
lastQueryScan,
@@ -2220,7 +2169,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
AlignedUpdateLastCacheOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
1);
return new AlignedUpdateLastCacheOperator(
operatorContext,
lastQueryScan,
@@ -2236,7 +2184,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
AlignedUpdateViewPathLastCacheOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
1);
return new AlignedUpdateViewPathLastCacheOperator(
operatorContext,
lastQueryScan,
@@ -2283,7 +2230,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
((DataDriverContext) context.getDriverContext())
.addSourceOperator(seriesAggregationScanOperator);
((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
return seriesAggregationScanOperator;
}
@@ -2326,7 +2272,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
((DataDriverContext) context.getDriverContext())
.addSourceOperator(seriesAggregationScanOperator);
((DataDriverContext) context.getDriverContext()).addPath(unCachedPath);
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
aggregators.size());
return seriesAggregationScanOperator;
}
@@ -2423,7 +2368,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
LastQueryOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
1);
return new LastQueryOperator(operatorContext, operatorList, builder);
} else {
// order by timeseries
@@ -2454,7 +2398,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
LastQuerySortOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext,
1);
return new LastQuerySortOperator(operatorContext, builder.build(),
operatorList, comparator);
}
}
@@ -2479,7 +2422,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
? ASC_BINARY_COMPARATOR
: DESC_BINARY_COMPARATOR;
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new LastQueryMergeOperator(operatorContext, children, comparator);
}
@@ -2495,7 +2437,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
node.getPlanNodeId(),
LastQueryCollectOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new LastQueryCollectOperator(operatorContext, children);
}
@@ -2511,7 +2452,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
node.getPlanNodeId(),
LastQueryCollectOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new LastQueryTransformOperator(
node.getViewPath(), node.getDataType(), operatorContext, operator);
}
@@ -2567,7 +2507,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaQueryScanOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new SchemaQueryScanOperator<>(
node.getPlanNodeId(),
operatorContext,
@@ -2584,7 +2523,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaQueryScanOperator.class.getSimpleName());
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new SchemaQueryScanOperator<>(
node.getPlanNodeId(),
operatorContext,
@@ -2761,7 +2699,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
childNode.getPlanNodeId(),
childOperation.calculateMaxReturnSize());
-
context.getTimeSliceAllocator().recordExecutionWeight(sourceOperator.getOperatorContext(),
1);
context.addExchangeOperator(sourceOperator);
return sourceOperator;
}
@@ -2846,9 +2783,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
childNode.getPlanNodeId(),
childOperation.calculateMaxReturnSize());
context.getCurrentPipelineDriverFactory().setDownstreamOperator(sourceOperator);
- context
- .getTimeSliceAllocator()
- .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
parentPipelineChildren.add(sourceOperator);
context.addExchangeOperator(sourceOperator);
int childExchangeNum = subContext.getExchangeSumNum() -
context.getExchangeSumNum() + 1;