This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6702808c25 [IOTDB-3414] [mpp] fix schema query limit offset bug (#6179)
6702808c25 is described below

commit 6702808c25ba78e8b53364eef93771e35405efee
Author: xinzhongtianxia <[email protected]>
AuthorDate: Sun Jun 19 22:13:29 2022 +0800

    [IOTDB-3414] [mpp] fix schema query limit offset bug (#6179)
---
 .../mpp/plan/analyze/FakePartitionFetcherImpl.java | 53 +++++++++++++++-
 .../iotdb/db/mpp/plan/planner/LogicalPlanner.java  | 72 +++++++++++++++++-----
 .../plan/planner/distribution/SourceRewriter.java  |  7 +--
 .../iotdb/db/mpp/plan/plan/LogicalPlannerTest.java | 16 ++---
 4 files changed, 116 insertions(+), 32 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
index 8738ae9cd9..19f7b43fed 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
@@ -43,7 +43,58 @@ public class FakePartitionFetcherImpl implements 
IPartitionFetcher {
 
   @Override
   public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
-    return null;
+    String device1 = "root.sg.d1";
+    String device2 = "root.sg.d22";
+    String device3 = "root.sg.d333";
+
+    SchemaPartition schemaPartition =
+        new SchemaPartition(
+            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+    Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> 
schemaPartitionMap = new HashMap<>();
+
+    Map<TSeriesPartitionSlot, TRegionReplicaSet> regionMap = new HashMap<>();
+    TRegionReplicaSet region1 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1),
+            Arrays.asList(
+                new TDataNodeLocation()
+                    .setDataNodeId(11)
+                    .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
+                new TDataNodeLocation()
+                    .setDataNodeId(12)
+                    .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000))));
+    regionMap.put(new TSeriesPartitionSlot(device1.length()), region1);
+
+    TRegionReplicaSet region2 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 2),
+            Arrays.asList(
+                new TDataNodeLocation()
+                    .setDataNodeId(31)
+                    .setExternalEndPoint(new TEndPoint("192.0.3.1", 9000)),
+                new TDataNodeLocation()
+                    .setDataNodeId(32)
+                    .setExternalEndPoint(new TEndPoint("192.0.3.2", 9000))));
+    regionMap.put(new TSeriesPartitionSlot(device2.length()), region2);
+
+    TRegionReplicaSet region3 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 3),
+            Arrays.asList(
+                new TDataNodeLocation()
+                    .setDataNodeId(11)
+                    .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
+                new TDataNodeLocation()
+                    .setDataNodeId(12)
+                    .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000))));
+    regionMap.put(new TSeriesPartitionSlot(device3.length()), region3);
+
+    schemaPartitionMap.put("root.sg", regionMap);
+
+    schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
+
+    return schemaPartition;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index 8221811da1..b94e106bdb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -442,14 +442,27 @@ public class LogicalPlanner {
     public PlanNode visitShowTimeSeries(
         ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext 
context) {
       LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+
+      // If there is only one region, we can push down the offset and limit 
operation to
+      // source operator.
+      boolean canPushDownOffsetLimit =
+          analysis.getSchemaPartitionInfo() != null
+              && 
analysis.getSchemaPartitionInfo().getDistributionInfo().size() == 1;
+
+      int limit = showTimeSeriesStatement.getLimit();
+      int offset = showTimeSeriesStatement.getOffset();
+      if (!canPushDownOffsetLimit) {
+        limit = showTimeSeriesStatement.getLimit() + 
showTimeSeriesStatement.getOffset();
+        offset = 0;
+      }
       planBuilder =
           planBuilder
               .planTimeSeriesSchemaSource(
                   showTimeSeriesStatement.getPathPattern(),
                   showTimeSeriesStatement.getKey(),
                   showTimeSeriesStatement.getValue(),
-                  showTimeSeriesStatement.getLimit(),
-                  showTimeSeriesStatement.getOffset(),
+                  limit,
+                  offset,
                   showTimeSeriesStatement.isOrderByHeat(),
                   showTimeSeriesStatement.isContains(),
                   showTimeSeriesStatement.isPrefixPath())
@@ -464,27 +477,52 @@ public class LogicalPlanner {
                 .getRoot();
         planBuilder = planBuilder.planSchemaQueryOrderByHeat(lastPlanNode);
       }
-      return planBuilder
-          .planOffset(showTimeSeriesStatement.getOffset())
-          .planLimit(showTimeSeriesStatement.getLimit())
-          .getRoot();
+
+      if (!canPushDownOffsetLimit) {
+        return planBuilder
+            .planOffset(showTimeSeriesStatement.getOffset())
+            .planLimit(showTimeSeriesStatement.getLimit())
+            .getRoot();
+      }
+
+      return planBuilder.getRoot();
     }
 
     @Override
     public PlanNode visitShowDevices(
         ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
       LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
-      return planBuilder
-          .planDeviceSchemaSource(
-              showDevicesStatement.getPathPattern(),
-              showDevicesStatement.getLimit(),
-              showDevicesStatement.getOffset(),
-              showDevicesStatement.isPrefixPath(),
-              showDevicesStatement.hasSgCol())
-          .planSchemaQueryMerge(false)
-          .planOffset(showDevicesStatement.getOffset())
-          .planLimit(showDevicesStatement.getLimit())
-          .getRoot();
+
+      // If there is only one region, we can push down the offset and limit 
operation to
+      // source operator.
+      boolean canPushDownOffsetLimit =
+          analysis.getSchemaPartitionInfo() != null
+              && 
analysis.getSchemaPartitionInfo().getDistributionInfo().size() == 1;
+
+      int limit = showDevicesStatement.getLimit();
+      int offset = showDevicesStatement.getOffset();
+      if (!canPushDownOffsetLimit) {
+        limit = showDevicesStatement.getLimit() + 
showDevicesStatement.getOffset();
+        offset = 0;
+      }
+
+      planBuilder =
+          planBuilder
+              .planDeviceSchemaSource(
+                  showDevicesStatement.getPathPattern(),
+                  limit,
+                  offset,
+                  showDevicesStatement.isPrefixPath(),
+                  showDevicesStatement.hasSgCol())
+              .planSchemaQueryMerge(false);
+
+      if (!canPushDownOffsetLimit) {
+        return planBuilder
+            .planOffset(showDevicesStatement.getOffset())
+            .planLimit(showDevicesStatement.getLimit())
+            .getRoot();
+      }
+      return planBuilder.getRoot();
     }
 
     @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 070256c39e..747b9933d1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -177,17 +177,11 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
                   (deviceGroupId, schemaRegionReplicaSet) ->
                       schemaRegions.add(schemaRegionReplicaSet));
             });
-    int count = schemaRegions.size();
     schemaRegions.forEach(
         region -> {
           SchemaQueryScanNode schemaQueryScanNode = (SchemaQueryScanNode) 
seed.clone();
           
schemaQueryScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
           schemaQueryScanNode.setRegionReplicaSet(region);
-          if (count > 1) {
-            schemaQueryScanNode.setLimit(
-                schemaQueryScanNode.getOffset() + 
schemaQueryScanNode.getLimit());
-            schemaQueryScanNode.setOffset(0);
-          }
           root.addChild(schemaQueryScanNode);
         });
     return root;
@@ -530,6 +524,7 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     return aggregationNode;
   }
 
+  @Override
   public PlanNode visitGroupByLevel(GroupByLevelNode root, 
DistributionPlanContext context) {
     // Firstly, we build the tree structure for GroupByLevelNode
     List<SeriesAggregationSourceNode> sources = 
splitAggregationSourceByPartition(root, context);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
index aa3ba7d7cf..860a7b2c7e 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
@@ -491,8 +491,8 @@ public class LogicalPlannerTest {
       Assert.assertFalse(showTimeSeriesNode.isContains());
       Assert.assertEquals("tagK", showTimeSeriesNode.getKey());
       Assert.assertEquals("tagV", showTimeSeriesNode.getValue());
-      Assert.assertEquals(20, showTimeSeriesNode.getLimit());
-      Assert.assertEquals(10, showTimeSeriesNode.getOffset());
+      Assert.assertEquals(30, showTimeSeriesNode.getLimit());
+      Assert.assertEquals(0, showTimeSeriesNode.getOffset());
       Assert.assertTrue(showTimeSeriesNode.isHasLimit());
 
       // test serialize and deserialize
@@ -509,8 +509,8 @@ public class LogicalPlannerTest {
       Assert.assertFalse(showTimeSeriesNode2.isContains());
       Assert.assertEquals("tagK", showTimeSeriesNode2.getKey());
       Assert.assertEquals("tagV", showTimeSeriesNode2.getValue());
-      Assert.assertEquals(20, showTimeSeriesNode2.getLimit());
-      Assert.assertEquals(10, showTimeSeriesNode2.getOffset());
+      Assert.assertEquals(30, showTimeSeriesNode2.getLimit());
+      Assert.assertEquals(0, showTimeSeriesNode2.getOffset());
       Assert.assertTrue(showTimeSeriesNode2.isHasLimit());
     } catch (Exception e) {
       e.printStackTrace();
@@ -530,8 +530,8 @@ public class LogicalPlannerTest {
       Assert.assertNotNull(showDevicesNode);
       Assert.assertEquals(new PartialPath("root.ln.wf01.wt01"), 
showDevicesNode.getPath());
       Assert.assertTrue(showDevicesNode.isHasSgCol());
-      Assert.assertEquals(20, showDevicesNode.getLimit());
-      Assert.assertEquals(10, showDevicesNode.getOffset());
+      Assert.assertEquals(30, showDevicesNode.getLimit());
+      Assert.assertEquals(0, showDevicesNode.getOffset());
       Assert.assertTrue(showDevicesNode.isHasLimit());
 
       // test serialize and deserialize
@@ -542,8 +542,8 @@ public class LogicalPlannerTest {
           (DevicesSchemaScanNode) PlanNodeType.deserialize(byteBuffer);
       Assert.assertNotNull(showDevicesNode2);
       Assert.assertEquals(new PartialPath("root.ln.wf01.wt01"), 
showDevicesNode2.getPath());
-      Assert.assertEquals(20, showDevicesNode2.getLimit());
-      Assert.assertEquals(10, showDevicesNode2.getOffset());
+      Assert.assertEquals(30, showDevicesNode2.getLimit());
+      Assert.assertEquals(0, showDevicesNode2.getOffset());
       Assert.assertTrue(showDevicesNode2.isHasLimit());
     } catch (Exception e) {
       e.printStackTrace();

Reply via email to