This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/no_data_query in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0974d770eda65f06d7eefb09447fca2000b6cc69 Author: Jinrui.Zhang <[email protected]> AuthorDate: Thu Jun 9 19:33:58 2022 +0800 fix the issue that NPE will be threw when query the series with no data --- .../iotdb/commons/partition/DataPartition.java | 6 +- .../planner/distribution/ExchangeNodeAdder.java | 17 ++++- .../db/mpp/plan/planner/plan/PlanFragment.java | 3 +- .../plan/planner/plan/node/PlanGraphPrinter.java | 18 +++-- .../distribution/NoDataRegionPlanningTest.java | 84 ++++++++++++++++++++++ 5 files changed, 120 insertions(+), 8 deletions(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java index 4f343dab68..34a10ae161 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java @@ -40,7 +40,7 @@ import java.util.Set; import java.util.stream.Collectors; public class DataPartition extends Partition { - + public static final TRegionReplicaSet NOT_ASSIGNED = new TRegionReplicaSet(); // Map<StorageGroup, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionMessage>>>> private Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>> dataPartitionMap; @@ -78,6 +78,10 @@ public class DataPartition extends Partition { String deviceName, List<TTimePartitionSlot> timePartitionSlotList) { String storageGroup = getStorageGroupByDevice(deviceName); TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + if (!dataPartitionMap.containsKey(storageGroup) + || !dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) { + return Collections.singletonList(NOT_ASSIGNED); + } // TODO: (xingtanzjr) the timePartitionIdList is ignored return dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).values().stream() .flatMap(Collection::stream) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java index 3373722007..4776721632 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner.distribution; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode; @@ -238,7 +239,11 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { // parent. visitedChildren.forEach( child -> { - if (!dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).region)) { + // If the child's region is NOT_ASSIGNED, it means the child do not belong to any + // existing DataRegion. We make it belong to its parent and no ExchangeNode will be added. + if (context.getNodeDistribution(child.getPlanNodeId()).region + != DataPartition.NOT_ASSIGNED + && !dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).region)) { ExchangeNode exchangeNode = new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId()); exchangeNode.setChild(child); @@ -286,8 +291,16 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { return region; }, Collectors.counting())); + if (groupByRegion.entrySet().size() == 1) { + return groupByRegion.entrySet().iterator().next().getKey(); + } // Step 2: return the RegionReplicaSet with max count - return Collections.max(groupByRegion.entrySet(), Map.Entry.comparingByValue()).getKey(); + return Collections.max( + groupByRegion.entrySet().stream() + .filter(e -> e.getKey() != DataPartition.NOT_ASSIGNED) + .collect(Collectors.toList()), + Map.Entry.comparingByValue()) + .getKey(); } private TRegionReplicaSet calculateSchemaRegionByChildren( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java index b9c835dca0..f7a91edcbe 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode; @@ -84,7 +85,7 @@ public class PlanFragment { } for (PlanNode child : root.getChildren()) { TRegionReplicaSet result = getNodeRegion(child); - if (result != null) { + if (result != null && result != DataPartition.NOT_ASSIGNED) { return result; } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java index 72345865a1..f395d66a7f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode; @@ -74,7 +76,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter List<String> boxValue = new ArrayList<>(); boxValue.add(String.format("SeriesScan-%s", node.getPlanNodeId().getId())); boxValue.add(String.format("Series: %s", node.getSeriesPath())); - boxValue.add(String.format("PartitionId: %s", node.getRegionReplicaSet().getRegionId().id)); + boxValue.add(printRegion(node.getRegionReplicaSet())); return render(node, boxValue, context); } @@ -86,7 +88,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter String.format( "Series: %s%s", node.getAlignedPath().getDevice(), node.getAlignedPath().getMeasurementList())); - boxValue.add(String.format("PartitionId: %s", node.getRegionReplicaSet().getRegionId().id)); + boxValue.add(printRegion(node.getRegionReplicaSet())); return render(node, boxValue, context); } @@ -102,7 +104,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter String.format( "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), descriptor.getStep())); } - boxValue.add(String.format("PartitionId: %s", node.getRegionReplicaSet().getRegionId().id)); + boxValue.add(printRegion(node.getRegionReplicaSet())); return render(node, boxValue, context); } @@ -121,7 +123,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter String.format( "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), descriptor.getStep())); } - boxValue.add(String.format("PartitionId: %s", node.getRegionReplicaSet().getRegionId().id)); + boxValue.add(printRegion(node.getRegionReplicaSet())); return render(node, boxValue, context); } @@ -253,6 +255,14 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter return render(node, boxValue, context); } + private String printRegion(TRegionReplicaSet regionReplicaSet) { + return String.format( + "Partition: %s", + regionReplicaSet == null || regionReplicaSet == DataPartition.NOT_ASSIGNED + ? "Not Assigned" + : String.valueOf(regionReplicaSet.getRegionId().id)); + } + private List<String> render(PlanNode node, List<String> nodeBoxString, GraphContext context) { Box box = new Box(nodeBoxString); List<List<String>> children = new ArrayList<>(); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/NoDataRegionPlanningTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/NoDataRegionPlanningTest.java new file mode 100644 index 0000000000..1caaedf559 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/NoDataRegionPlanningTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.plan.plan.distribution; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.db.metadata.path.MeasurementPath; +import org.apache.iotdb.db.mpp.common.MPPQueryContext; +import org.apache.iotdb.db.mpp.common.QueryId; +import org.apache.iotdb.db.mpp.plan.analyze.Analysis; +import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner; +import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan; +import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode; +import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class NoDataRegionPlanningTest { + @Test + public void testParallelPlan() throws IllegalPathException { + String d1s1 = "root.sg.d1.s1"; + String d1s2 = "root.sg.d1.s2"; + String d3s1 = "root.sg.d333.s1"; + String d5s1 = "root.sg.d55555.s1"; + + QueryId queryId = new QueryId("test_query"); + TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC); + + timeJoinNode.addChild( + new SeriesScanNode( + queryId.genPlanNodeId(), + new MeasurementPath(d1s1, TSDataType.INT32), + OrderBy.TIMESTAMP_ASC)); + timeJoinNode.addChild( + new SeriesScanNode( + queryId.genPlanNodeId(), + new MeasurementPath(d1s2, TSDataType.INT32), + OrderBy.TIMESTAMP_ASC)); + timeJoinNode.addChild( + new SeriesScanNode( + queryId.genPlanNodeId(), + new MeasurementPath(d3s1, TSDataType.INT32), + OrderBy.TIMESTAMP_ASC)); + timeJoinNode.addChild( + new SeriesScanNode( + queryId.genPlanNodeId(), + new MeasurementPath(d5s1, TSDataType.INT32), + OrderBy.TIMESTAMP_ASC)); + + LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10); + + Analysis analysis = Util.constructAnalysis(); + + MPPQueryContext context = + new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint()); + DistributionPlanner planner = + new DistributionPlanner(analysis, new LogicalQueryPlan(context, root)); + DistributedQueryPlan plan = planner.planFragments(); + assertEquals(3, plan.getInstances().size()); + } +}
