This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/no_data_query
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0974d770eda65f06d7eefb09447fca2000b6cc69
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Thu Jun 9 19:33:58 2022 +0800

    fix the issue that NPE will be threw when query the series with no data
---
 .../iotdb/commons/partition/DataPartition.java     |  6 +-
 .../planner/distribution/ExchangeNodeAdder.java    | 17 ++++-
 .../db/mpp/plan/planner/plan/PlanFragment.java     |  3 +-
 .../plan/planner/plan/node/PlanGraphPrinter.java   | 18 +++--
 .../distribution/NoDataRegionPlanningTest.java     | 84 ++++++++++++++++++++++
 5 files changed, 120 insertions(+), 8 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 4f343dab68..34a10ae161 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -40,7 +40,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 public class DataPartition extends Partition {
-
+  public static final TRegionReplicaSet NOT_ASSIGNED = new TRegionReplicaSet();
   // Map<StorageGroup, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionMessage>>>>
   private Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>>>
       dataPartitionMap;
@@ -78,6 +78,10 @@ public class DataPartition extends Partition {
       String deviceName, List<TTimePartitionSlot> timePartitionSlotList) {
     String storageGroup = getStorageGroupByDevice(deviceName);
     TSeriesPartitionSlot seriesPartitionSlot = 
calculateDeviceGroupId(deviceName);
+    if (!dataPartitionMap.containsKey(storageGroup)
+        || 
!dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) {
+      return Collections.singletonList(NOT_ASSIGNED);
+    }
     // TODO: (xingtanzjr) the timePartitionIdList is ignored
     return 
dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).values().stream()
         .flatMap(Collection::stream)
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 3373722007..4776721632 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
@@ -238,7 +239,11 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     // parent.
     visitedChildren.forEach(
         child -> {
-          if 
(!dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).region)) 
{
+          // If the child's region is NOT_ASSIGNED, it means the child do not 
belong to any
+          // existing DataRegion. We make it belong to its parent and no 
ExchangeNode will be added.
+          if (context.getNodeDistribution(child.getPlanNodeId()).region
+                  != DataPartition.NOT_ASSIGNED
+              && 
!dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).region)) {
             ExchangeNode exchangeNode =
                 new 
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
             exchangeNode.setChild(child);
@@ -286,8 +291,16 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
                       return region;
                     },
                     Collectors.counting()));
+    if (groupByRegion.entrySet().size() == 1) {
+      return groupByRegion.entrySet().iterator().next().getKey();
+    }
     // Step 2: return the RegionReplicaSet with max count
-    return Collections.max(groupByRegion.entrySet(), 
Map.Entry.comparingByValue()).getKey();
+    return Collections.max(
+            groupByRegion.entrySet().stream()
+                .filter(e -> e.getKey() != DataPartition.NOT_ASSIGNED)
+                .collect(Collectors.toList()),
+            Map.Entry.comparingByValue())
+        .getKey();
   }
 
   private TRegionReplicaSet calculateSchemaRegionByChildren(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
index b9c835dca0..f7a91edcbe 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
@@ -84,7 +85,7 @@ public class PlanFragment {
     }
     for (PlanNode child : root.getChildren()) {
       TRegionReplicaSet result = getNodeRegion(child);
-      if (result != null) {
+      if (result != null && result != DataPartition.NOT_ASSIGNED) {
         return result;
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
index 72345865a1..f395d66a7f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.plan.planner.plan.node;
 
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
@@ -74,7 +76,7 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("SeriesScan-%s", node.getPlanNodeId().getId()));
     boxValue.add(String.format("Series: %s", node.getSeriesPath()));
-    boxValue.add(String.format("PartitionId: %s", 
node.getRegionReplicaSet().getRegionId().id));
+    boxValue.add(printRegion(node.getRegionReplicaSet()));
     return render(node, boxValue, context);
   }
 
@@ -86,7 +88,7 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
         String.format(
             "Series: %s%s",
             node.getAlignedPath().getDevice(), 
node.getAlignedPath().getMeasurementList()));
-    boxValue.add(String.format("PartitionId: %s", 
node.getRegionReplicaSet().getRegionId().id));
+    boxValue.add(printRegion(node.getRegionReplicaSet()));
     return render(node, boxValue, context);
   }
 
@@ -102,7 +104,7 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
           String.format(
               "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), 
descriptor.getStep()));
     }
-    boxValue.add(String.format("PartitionId: %s", 
node.getRegionReplicaSet().getRegionId().id));
+    boxValue.add(printRegion(node.getRegionReplicaSet()));
     return render(node, boxValue, context);
   }
 
@@ -121,7 +123,7 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
           String.format(
               "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), 
descriptor.getStep()));
     }
-    boxValue.add(String.format("PartitionId: %s", 
node.getRegionReplicaSet().getRegionId().id));
+    boxValue.add(printRegion(node.getRegionReplicaSet()));
     return render(node, boxValue, context);
   }
 
@@ -253,6 +255,14 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
     return render(node, boxValue, context);
   }
 
+  private String printRegion(TRegionReplicaSet regionReplicaSet) {
+    return String.format(
+        "Partition: %s",
+        regionReplicaSet == null || regionReplicaSet == 
DataPartition.NOT_ASSIGNED
+            ? "Not Assigned"
+            : String.valueOf(regionReplicaSet.getRegionId().id));
+  }
+
   private List<String> render(PlanNode node, List<String> nodeBoxString, 
GraphContext context) {
     Box box = new Box(nodeBoxString);
     List<List<String>> children = new ArrayList<>();
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/NoDataRegionPlanningTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/NoDataRegionPlanningTest.java
new file mode 100644
index 0000000000..1caaedf559
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/NoDataRegionPlanningTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class NoDataRegionPlanningTest {
+  @Test
+  public void testParallelPlan() throws IllegalPathException {
+    String d1s1 = "root.sg.d1.s1";
+    String d1s2 = "root.sg.d1.s2";
+    String d3s1 = "root.sg.d333.s1";
+    String d5s1 = "root.sg.d55555.s1";
+
+    QueryId queryId = new QueryId("test_query");
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), 
OrderBy.TIMESTAMP_ASC);
+
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath(d1s1, TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath(d1s2, TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath(d3s1, TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath(d5s1, TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+
+    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
+
+    Analysis analysis = Util.constructAnalysis();
+
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+  }
+}

Reply via email to