This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0acd405bc0c Remove wrong TimeSliceAllocator (#11569)
0acd405bc0c is described below

commit 0acd405bc0ca9515ec1e8273ec1105f743fa91ba
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Nov 21 10:32:26 2023 +0800

    Remove wrong TimeSliceAllocator (#11569)
---
 .../execution/driver/DriverContext.java            |  8 ---
 .../execution/operator/OperatorContext.java        | 12 +++-
 .../execution/timer/ITimeSliceAllocator.java       | 29 ---------
 .../timer/RuleBasedTimeSliceAllocator.java         | 69 ----------------------
 .../plan/planner/LocalExecutionPlanContext.java    | 11 ----
 .../plan/planner/OperatorTreeGenerator.java        | 66 ---------------------
 6 files changed, 9 insertions(+), 186 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
index 5623e8a3439..79cb4acf3b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java
@@ -25,7 +25,6 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTaskId;
-import 
org.apache.iotdb.db.queryengine.execution.timer.RuleBasedTimeSliceAllocator;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 
 import java.util.ArrayList;
@@ -39,7 +38,6 @@ public class DriverContext {
   private final FragmentInstanceContext fragmentInstanceContext;
   private final List<OperatorContext> operatorContexts = new ArrayList<>();
   private ISink sink;
-  private final RuleBasedTimeSliceAllocator timeSliceAllocator;
 
   private int dependencyDriverIndex = -1;
   private ExchangeOperator downstreamOperator;
@@ -50,13 +48,11 @@ public class DriverContext {
   @TestOnly
   public DriverContext() {
     this.fragmentInstanceContext = null;
-    this.timeSliceAllocator = null;
   }
 
   public DriverContext(FragmentInstanceContext fragmentInstanceContext, int 
pipelineId) {
     this.fragmentInstanceContext = fragmentInstanceContext;
     this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(), 
pipelineId);
-    this.timeSliceAllocator = new RuleBasedTimeSliceAllocator();
   }
 
   public OperatorContext addOperatorContext(
@@ -108,10 +104,6 @@ public class DriverContext {
     return operatorContexts;
   }
 
-  public RuleBasedTimeSliceAllocator getTimeSliceAllocator() {
-    return timeSliceAllocator;
-  }
-
   public int getPipelineId() {
     return driverTaskID.getPipelineId();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
index 4a9b4c0a6ea..709e34fb0eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.execution.operator;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
@@ -28,6 +29,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import io.airlift.units.Duration;
 
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Contains information about {@link Operator} execution.
@@ -36,12 +38,16 @@ import java.util.Objects;
  */
 public class OperatorContext {
 
+  private static Duration maxRunTime =
+      new Duration(
+          
IoTDBDescriptor.getInstance().getConfig().getDriverTaskExecutionTimeSliceInMs(),
+          TimeUnit.MILLISECONDS);
+
   private final int operatorId;
   // It seems it's never used.
   private final PlanNodeId planNodeId;
   private final String operatorType;
   private DriverContext driverContext;
-  private Duration maxRunTime;
 
   private long totalExecutionTimeInNanos = 0L;
   private long nextCalledCount = 0L;
@@ -90,8 +96,8 @@ public class OperatorContext {
     return maxRunTime;
   }
 
-  public void setMaxRunTime(Duration maxRunTime) {
-    this.maxRunTime = maxRunTime;
+  public static void setMaxRunTime(Duration maxRunTime) {
+    OperatorContext.maxRunTime = maxRunTime;
   }
 
   public SessionInfo getSessionInfo() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/ITimeSliceAllocator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/ITimeSliceAllocator.java
deleted file mode 100644
index ef2788e957d..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/ITimeSliceAllocator.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.execution.timer;
-
-import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-
-import io.airlift.units.Duration;
-
-public interface ITimeSliceAllocator {
-
-  Duration getMaxRunTime(OperatorContext operatorContext);
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/RuleBasedTimeSliceAllocator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/RuleBasedTimeSliceAllocator.java
deleted file mode 100644
index e934ebba463..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/timer/RuleBasedTimeSliceAllocator.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.execution.timer;
-
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-
-import io.airlift.units.Duration;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkState;
-
-public class RuleBasedTimeSliceAllocator implements ITimeSliceAllocator {
-
-  private static final long EXECUTION_TIME_SLICE_IN_MS =
-      new Duration(
-              
IoTDBDescriptor.getInstance().getConfig().getDriverTaskExecutionTimeSliceInMs(),
-              TimeUnit.MILLISECONDS)
-          .roundTo(TimeUnit.MILLISECONDS);
-
-  private final Map<OperatorContext, Integer> operatorToWeightMap;
-
-  private int totalWeight;
-
-  public RuleBasedTimeSliceAllocator() {
-    this.operatorToWeightMap = new HashMap<>();
-    this.totalWeight = 0;
-  }
-
-  public void recordExecutionWeight(OperatorContext operatorContext, int 
weight) {
-    checkState(
-        !operatorToWeightMap.containsKey(operatorContext), "Same operator has 
been weighted");
-    operatorToWeightMap.put(operatorContext, weight);
-    totalWeight += weight;
-  }
-
-  private int getWeight(OperatorContext operatorContext) {
-    checkState(
-        operatorToWeightMap.containsKey(operatorContext), "This operator has 
not been weighted");
-    return operatorToWeightMap.get(operatorContext);
-  }
-
-  @Override
-  public Duration getMaxRunTime(OperatorContext operatorContext) {
-    return new Duration(
-        (double) EXECUTION_TIME_SLICE_IN_MS * getWeight(operatorContext) / 
totalWeight,
-        TimeUnit.MILLISECONDS);
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
index fc190a7ae03..018789b64f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java
@@ -28,7 +28,6 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
-import 
org.apache.iotdb.db.queryengine.execution.timer.RuleBasedTimeSliceAllocator;
 import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -134,12 +133,6 @@ public class LocalExecutionPlanContext {
 
   public void addPipelineDriverFactory(
       Operator operation, DriverContext driverContext, long 
estimatedMemorySize) {
-    driverContext
-        .getOperatorContexts()
-        .forEach(
-            operatorContext ->
-                operatorContext.setMaxRunTime(
-                    
driverContext.getTimeSliceAllocator().getMaxRunTime(operatorContext)));
     pipelineDriverFactories.add(
         new PipelineDriverFactory(operation, driverContext, 
estimatedMemorySize));
   }
@@ -264,10 +257,6 @@ public class LocalExecutionPlanContext {
     return typeProvider;
   }
 
-  public RuleBasedTimeSliceAllocator getTimeSliceAllocator() {
-    return driverContext.getTimeSliceAllocator();
-  }
-
   public FragmentInstanceContext getInstanceContext() {
     return driverContext.getFragmentInstanceContext();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index bc70aff0fe9..b97dee886e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -323,7 +323,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(seriesScanOperator);
     ((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
     context.getDriverContext().setInputDriver(true);
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return seriesScanOperator;
   }
 
@@ -363,9 +362,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(seriesScanOperator);
     ((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
     context.getDriverContext().setInputDriver(true);
-    context
-        .getTimeSliceAllocator()
-        .recordExecutionWeight(operatorContext, seriesPath.getColumnNum());
     return seriesScanOperator;
   }
 
@@ -429,7 +425,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(aggregateScanOperator);
     ((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
     context.getDriverContext().setInputDriver(true);
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
     return aggregateScanOperator;
   }
 
@@ -522,7 +517,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         .addSourceOperator(seriesAggregationScanOperator);
     ((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
     context.getDriverContext().setInputDriver(true);
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
     return seriesAggregationScanOperator;
   }
 
@@ -540,7 +534,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 node.getPlanNodeId(),
                 SchemaQueryOrderByHeatOperator.class.getSimpleName());
 
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new SchemaQueryOrderByHeatOperator(operatorContext, children);
   }
 
@@ -577,7 +570,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 SchemaQueryScanOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new SchemaQueryScanOperator<>(
         node.getPlanNodeId(),
         operatorContext,
@@ -601,7 +593,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 SchemaQueryScanOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new SchemaQueryScanOperator<>(
         node.getPlanNodeId(),
         operatorContext,
@@ -626,7 +617,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 SchemaQueryMergeOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new SchemaQueryMergeOperator(operatorContext, children);
   }
 
@@ -640,7 +630,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 CountMergeOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     if (node.getChildren().get(0) instanceof LevelTimeSeriesCountNode) {
       return new CountGroupByLevelMergeOperator(operatorContext, children);
     } else {
@@ -657,7 +646,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 SchemaCountOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new SchemaCountOperator<>(
         node.getPlanNodeId(),
         operatorContext,
@@ -675,7 +663,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 SchemaCountOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new SchemaCountOperator<>(
         node.getPlanNodeId(),
         operatorContext,
@@ -697,7 +684,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 CountGroupByLevelScanOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new CountGroupByLevelScanOperator<>(
         node.getPlanNodeId(),
         operatorContext,
@@ -720,7 +706,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 SchemaQueryScanOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new SchemaQueryScanOperator<>(
         node.getPlanNodeId(),
         operatorContext,
@@ -739,7 +724,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 NodeManageMemoryMergeOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new NodeManageMemoryMergeOperator(operatorContext, node.getData(), 
child);
   }
 
@@ -754,7 +738,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 NodePathsConvertOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new NodePathsConvertOperator(operatorContext, child);
   }
 
@@ -768,7 +751,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 NodePathsCountOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new NodePathsCountOperator(operatorContext, child);
   }
 
@@ -791,7 +773,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     if (outputColumnTypes == null || outputColumnTypes.isEmpty()) {
       throw new IllegalStateException("OutputColumTypes should not be 
null/empty");
     }
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new SingleDeviceViewOperator(
         operatorContext, node.getDevice(), child, deviceColumnIndex, 
outputColumnTypes);
   }
@@ -812,7 +793,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             .collect(Collectors.toList());
     List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
 
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new DeviceViewOperator(
         operatorContext, node.getDevices(), children, deviceColumnIndex, 
outputColumnTypes);
   }
@@ -830,7 +810,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     context.setCachedDataTypes(dataTypes);
     List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, 
context);
     List<SortItem> sortItemList = 
node.getMergeOrderParameter().getSortItemList();
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
 
     List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
     List<TSDataType> sortItemDataTypeList = new 
ArrayList<>(sortItemList.size());
@@ -860,7 +839,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     context.setCachedDataTypes(dataTypes);
     List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, 
context);
     List<SortItem> sortItemList = 
node.getMergeOrderParameter().getSortItemList();
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
 
     List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
     List<TSDataType> sortItemDataTypeList = new 
ArrayList<>(sortItemList.size());
@@ -936,11 +914,9 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     switch (fillPolicy) {
       case VALUE:
         Literal literal = descriptor.getFillValue();
-        context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
1);
         return new FillOperator(
             operatorContext, getConstantFill(inputColumns, inputDataTypes, 
literal), child);
       case PREVIOUS:
-        context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
1);
         return new FillOperator(
             operatorContext,
             getPreviousFill(
@@ -955,7 +931,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                         .getZoneId())),
             child);
       case LINEAR:
-        context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
1);
         return new LinearFillOperator(
             operatorContext, getLinearFill(inputColumns, inputDataTypes), 
child);
       default:
@@ -1111,8 +1086,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
       ExpressionTypeAnalyzer.analyzeExpression(expressionTypes, 
projectExpression);
     }
 
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-
     boolean hasNonMappableUDF = false;
     for (Expression expression : projectExpressions) {
       if (!expression.isMappable(expressionTypes)) {
@@ -1203,7 +1176,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 FilterAndProjectOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
 
     for (Expression projectExpression : projectExpressions) {
       ExpressionTypeAnalyzer.analyzeExpression(expressionTypes, 
projectExpression);
@@ -1305,7 +1277,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                   context.getNextOperatorId(),
                   node.getPlanNodeId(),
                   TransformOperator.class.getSimpleName());
-      context.getTimeSliceAllocator().recordExecutionWeight(transformContext, 
1);
       return new TransformOperator(
           transformContext,
           filter,
@@ -1365,7 +1336,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         calculateMaxAggregationResultSize(
             aggregationDescriptors, timeRangeIterator, 
context.getTypeProvider());
 
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
     return new AggregationOperator(
         operatorContext, aggregators, timeRangeIterator, children, 
maxReturnSize);
   }
@@ -1428,7 +1398,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 TagAggregationOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregatorCount);
     long maxReturnSize =
         calculateMaxAggregationResultSize(
             aggregationDescriptors, timeRangeIterator, 
context.getTypeProvider());
@@ -1478,7 +1447,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         calculateMaxAggregationResultSize(
             aggregationDescriptors, timeRangeIterator, 
context.getTypeProvider());
 
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
     return new SlidingWindowAggregationOperator(
         operatorContext,
         aggregators,
@@ -1500,7 +1468,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 node.getPlanNodeId(),
                 LimitOperator.class.getSimpleName());
 
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new LimitOperator(operatorContext, node.getLimit(), child);
   }
 
@@ -1515,7 +1482,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 node.getPlanNodeId(),
                 OffsetOperator.class.getSimpleName());
 
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new OffsetOperator(operatorContext, node.getOffset(), child);
   }
 
@@ -1558,7 +1524,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                   context.getNextOperatorId(),
                   node.getPlanNodeId(),
                   RawDataAggregationOperator.class.getSimpleName());
-      context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
 
       ITimeRangeIterator timeRangeIterator =
           initTimeRangeIterator(groupByTimeParameter, ascending, true);
@@ -1660,7 +1625,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
           calculateMaxAggregationResultSize(
               aggregationDescriptors, timeRangeIterator, 
context.getTypeProvider());
 
-      context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
       return new AggregationOperator(
           operatorContext, aggregators, timeRangeIterator, children, 
maxReturnSize);
     }
@@ -1702,7 +1666,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     List<TSDataType> dataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
 
     List<SortItem> sortItemList = node.getOrderByParameter().getSortItemList();
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
 
     List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
     List<TSDataType> sortItemDataTypeList = new 
ArrayList<>(sortItemList.size());
@@ -1760,8 +1723,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         intoPathDescriptor.getTargetPathToDataTypeMap();
     long statementSizePerLine = 
calculateStatementSizePerLine(targetPathToDataTypeMap);
 
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-
     List<Pair<String, PartialPath>> sourceTargetPathPairList =
         intoPathDescriptor.getSourceTargetPathPairList();
     List<String> sourceColumnToViewList = 
intoPathDescriptor.getSourceColumnToViewList();
@@ -1824,7 +1785,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
           
calculateStatementSizePerLine(deviceToTargetPathDataTypeMap.get(sourceDevice));
     }
 
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new DeviceViewIntoOperator(
         operatorContext,
         child,
@@ -1913,7 +1873,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     List<ColumnMerger> mergers = createColumnMergers(outputColumns, 
timeComparator);
     List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
 
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new RowBasedTimeJoinOperator(
         operatorContext,
         children,
@@ -1936,7 +1895,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 HorizontallyConcatOperator.class.getSimpleName());
     List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
 
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new HorizontallyConcatOperator(operatorContext, children, 
outputColumnTypes);
   }
 
@@ -1950,8 +1908,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 node.getPlanNodeId(),
                 ShowQueriesOperator.class.getSimpleName());
 
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-
     return new ShowQueriesOperator(
         operatorContext, node.getPlanNodeId(), Coordinator.getInstance());
   }
@@ -1994,7 +1950,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 ExchangeOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 0);
 
     FragmentInstanceId localInstanceId = context.getInstanceContext().getId();
     FragmentInstanceId remoteInstanceId = node.getUpstreamInstanceId();
@@ -2037,7 +1992,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 IdentitySinkOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
 
     checkArgument(
         MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should 
not be null");
@@ -2068,7 +2022,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 ShuffleHelperOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
 
     // TODO implement pipeline division for shuffle sink
     context.setDegreeOfParallelism(1);
@@ -2103,7 +2056,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 SchemaFetchMergeOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new SchemaFetchMergeOperator(operatorContext, children, 
node.getStorageGroupList());
   }
 
@@ -2117,7 +2069,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 SchemaFetchScanOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new SchemaFetchScanOperator(
         node.getPlanNodeId(),
         operatorContext,
@@ -2177,7 +2128,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                   context.getNextOperatorId(),
                   node.getPlanNodeId(),
                   UpdateLastCacheOperator.class.getSimpleName());
-      context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
1);
       return new UpdateLastCacheOperator(
           operatorContext,
           lastQueryScan,
@@ -2194,7 +2144,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                   context.getNextOperatorId(),
                   node.getPlanNodeId(),
                   UpdateViewPathLastCacheOperator.class.getSimpleName());
-      context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
1);
       return new UpdateViewPathLastCacheOperator(
           operatorContext,
           lastQueryScan,
@@ -2220,7 +2169,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                   context.getNextOperatorId(),
                   node.getPlanNodeId(),
                   AlignedUpdateLastCacheOperator.class.getSimpleName());
-      context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
1);
       return new AlignedUpdateLastCacheOperator(
           operatorContext,
           lastQueryScan,
@@ -2236,7 +2184,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                   context.getNextOperatorId(),
                   node.getPlanNodeId(),
                   
AlignedUpdateViewPathLastCacheOperator.class.getSimpleName());
-      context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
1);
       return new AlignedUpdateViewPathLastCacheOperator(
           operatorContext,
           lastQueryScan,
@@ -2283,7 +2230,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     ((DataDriverContext) context.getDriverContext())
         .addSourceOperator(seriesAggregationScanOperator);
     ((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
     return seriesAggregationScanOperator;
   }
 
@@ -2326,7 +2272,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     ((DataDriverContext) context.getDriverContext())
         .addSourceOperator(seriesAggregationScanOperator);
     ((DataDriverContext) context.getDriverContext()).addPath(unCachedPath);
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
aggregators.size());
     return seriesAggregationScanOperator;
   }
 
@@ -2423,7 +2368,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                   context.getNextOperatorId(),
                   node.getPlanNodeId(),
                   LastQueryOperator.class.getSimpleName());
-      context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
1);
       return new LastQueryOperator(operatorContext, operatorList, builder);
     } else {
       // order by timeseries
@@ -2454,7 +2398,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                   context.getNextOperatorId(),
                   node.getPlanNodeId(),
                   LastQuerySortOperator.class.getSimpleName());
-      context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 
1);
       return new LastQuerySortOperator(operatorContext, builder.build(), 
operatorList, comparator);
     }
   }
@@ -2479,7 +2422,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             ? ASC_BINARY_COMPARATOR
             : DESC_BINARY_COMPARATOR;
 
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new LastQueryMergeOperator(operatorContext, children, comparator);
   }
 
@@ -2495,7 +2437,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 node.getPlanNodeId(),
                 LastQueryCollectOperator.class.getSimpleName());
 
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new LastQueryCollectOperator(operatorContext, children);
   }
 
@@ -2511,7 +2452,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 node.getPlanNodeId(),
                 LastQueryCollectOperator.class.getSimpleName());
 
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new LastQueryTransformOperator(
         node.getViewPath(), node.getDataType(), operatorContext, operator);
   }
@@ -2567,7 +2507,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 SchemaQueryScanOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new SchemaQueryScanOperator<>(
         node.getPlanNodeId(),
         operatorContext,
@@ -2584,7 +2523,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
                 SchemaQueryScanOperator.class.getSimpleName());
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
     return new SchemaQueryScanOperator<>(
         node.getPlanNodeId(),
         operatorContext,
@@ -2761,7 +2699,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             childNode.getPlanNodeId(),
             childOperation.calculateMaxReturnSize());
 
-    
context.getTimeSliceAllocator().recordExecutionWeight(sourceOperator.getOperatorContext(),
 1);
     context.addExchangeOperator(sourceOperator);
     return sourceOperator;
   }
@@ -2846,9 +2783,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                   childNode.getPlanNodeId(),
                   childOperation.calculateMaxReturnSize());
           
context.getCurrentPipelineDriverFactory().setDownstreamOperator(sourceOperator);
-          context
-              .getTimeSliceAllocator()
-              .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
           parentPipelineChildren.add(sourceOperator);
           context.addExchangeOperator(sourceOperator);
           int childExchangeNum = subContext.getExchangeSumNum() - 
context.getExchangeSumNum() + 1;


Reply via email to