This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch IOTDB-3134 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 40de06535b65b830e52adcf8e81535076794f80e Author: JackieTien97 <[email protected]> AuthorDate: Mon May 9 20:34:04 2022 +0800 [IOTDB-3134] Calculating allSensors fields in LocalExecutionPlanner --- .../db/mpp/plan/planner/LocalExecutionPlanner.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index 3a8919af44..3f3c1515f5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java @@ -106,9 +106,12 @@ import org.apache.commons.lang3.Validate; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; @@ -189,7 +192,7 @@ public class LocalExecutionPlanner { new SeriesScanOperator( node.getPlanNodeId(), seriesPath, - node.getAllSensors(), + context.getAllSensors(seriesPath.getDeviceIdString(), seriesPath.getMeasurement()), seriesPath.getSeriesType(), operatorContext, node.getTimeFilter(), @@ -375,7 +378,7 @@ public class LocalExecutionPlanner { new SeriesAggregateScanOperator( node.getPlanNodeId(), seriesPath, - node.getAllSensors(), + context.getAllSensors(seriesPath.getDeviceIdString(), seriesPath.getMeasurement()), operatorContext, aggregators, node.getTimeFilter(), @@ -693,6 +696,8 @@ public class LocalExecutionPlanner { private static class LocalExecutionPlanContext { private final FragmentInstanceContext instanceContext; private final List<PartialPath> paths; + // deviceId -> sensorId Set + private final Map<String, Set<String>> allSensorsMap; // Used to lock corresponding query resources private final List<DataSourceOperator> sourceOperators; private ISinkHandle sinkHandle; @@ -706,12 +711,14 @@ public class LocalExecutionPlanner { this.typeProvider = typeProvider; this.instanceContext = instanceContext; this.paths = new ArrayList<>(); + this.allSensorsMap = new HashMap<>(); this.sourceOperators = new ArrayList<>(); } public LocalExecutionPlanContext(FragmentInstanceContext instanceContext) { this.instanceContext = instanceContext; this.paths = new ArrayList<>(); + this.allSensorsMap = new HashMap<>(); this.sourceOperators = new ArrayList<>(); } @@ -723,6 +730,12 @@ public class LocalExecutionPlanner { return paths; } + public Set<String> getAllSensors(String deviceId, String sensorId) { + Set<String> allSensors = allSensorsMap.computeIfAbsent(deviceId, k -> new HashSet<>()); + allSensors.add(sensorId); + return allSensors; + } + public List<DataSourceOperator> getSourceOperators() { return sourceOperators; }
