This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6702808c25 [IOTDB-3414] [mpp] fix schema query limit offset bug (#6179)
6702808c25 is described below
commit 6702808c25ba78e8b53364eef93771e35405efee
Author: xinzhongtianxia <[email protected]>
AuthorDate: Sun Jun 19 22:13:29 2022 +0800
[IOTDB-3414] [mpp] fix schema query limit offset bug (#6179)
---
.../mpp/plan/analyze/FakePartitionFetcherImpl.java | 53 +++++++++++++++-
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 72 +++++++++++++++++-----
.../plan/planner/distribution/SourceRewriter.java | 7 +--
.../iotdb/db/mpp/plan/plan/LogicalPlannerTest.java | 16 ++---
4 files changed, 116 insertions(+), 32 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
index 8738ae9cd9..19f7b43fed 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakePartitionFetcherImpl.java
@@ -43,7 +43,58 @@ public class FakePartitionFetcherImpl implements
IPartitionFetcher {
@Override
public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
- return null;
+ String device1 = "root.sg.d1";
+ String device2 = "root.sg.d22";
+ String device3 = "root.sg.d333";
+
+ SchemaPartition schemaPartition =
+ new SchemaPartition(
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+ Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>>
schemaPartitionMap = new HashMap<>();
+
+ Map<TSeriesPartitionSlot, TRegionReplicaSet> regionMap = new HashMap<>();
+ TRegionReplicaSet region1 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1),
+ Arrays.asList(
+ new TDataNodeLocation()
+ .setDataNodeId(11)
+ .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
+ new TDataNodeLocation()
+ .setDataNodeId(12)
+ .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000))));
+ regionMap.put(new TSeriesPartitionSlot(device1.length()), region1);
+
+ TRegionReplicaSet region2 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 2),
+ Arrays.asList(
+ new TDataNodeLocation()
+ .setDataNodeId(31)
+ .setExternalEndPoint(new TEndPoint("192.0.3.1", 9000)),
+ new TDataNodeLocation()
+ .setDataNodeId(32)
+ .setExternalEndPoint(new TEndPoint("192.0.3.2", 9000))));
+ regionMap.put(new TSeriesPartitionSlot(device2.length()), region2);
+
+ TRegionReplicaSet region3 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 3),
+ Arrays.asList(
+ new TDataNodeLocation()
+ .setDataNodeId(11)
+ .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
+ new TDataNodeLocation()
+ .setDataNodeId(12)
+ .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000))));
+ regionMap.put(new TSeriesPartitionSlot(device3.length()), region3);
+
+ schemaPartitionMap.put("root.sg", regionMap);
+
+ schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
+
+ return schemaPartition;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index 8221811da1..b94e106bdb 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -442,14 +442,27 @@ public class LogicalPlanner {
public PlanNode visitShowTimeSeries(
ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext
context) {
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+
+ // If there is only one region, we can push down the offset and limit
operation to
+ // source operator.
+ boolean canPushDownOffsetLimit =
+ analysis.getSchemaPartitionInfo() != null
+ &&
analysis.getSchemaPartitionInfo().getDistributionInfo().size() == 1;
+
+ int limit = showTimeSeriesStatement.getLimit();
+ int offset = showTimeSeriesStatement.getOffset();
+ if (!canPushDownOffsetLimit) {
+ limit = showTimeSeriesStatement.getLimit() +
showTimeSeriesStatement.getOffset();
+ offset = 0;
+ }
planBuilder =
planBuilder
.planTimeSeriesSchemaSource(
showTimeSeriesStatement.getPathPattern(),
showTimeSeriesStatement.getKey(),
showTimeSeriesStatement.getValue(),
- showTimeSeriesStatement.getLimit(),
- showTimeSeriesStatement.getOffset(),
+ limit,
+ offset,
showTimeSeriesStatement.isOrderByHeat(),
showTimeSeriesStatement.isContains(),
showTimeSeriesStatement.isPrefixPath())
@@ -464,27 +477,52 @@ public class LogicalPlanner {
.getRoot();
planBuilder = planBuilder.planSchemaQueryOrderByHeat(lastPlanNode);
}
- return planBuilder
- .planOffset(showTimeSeriesStatement.getOffset())
- .planLimit(showTimeSeriesStatement.getLimit())
- .getRoot();
+
+ if (!canPushDownOffsetLimit) {
+ return planBuilder
+ .planOffset(showTimeSeriesStatement.getOffset())
+ .planLimit(showTimeSeriesStatement.getLimit())
+ .getRoot();
+ }
+
+ return planBuilder.getRoot();
}
@Override
public PlanNode visitShowDevices(
ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
- return planBuilder
- .planDeviceSchemaSource(
- showDevicesStatement.getPathPattern(),
- showDevicesStatement.getLimit(),
- showDevicesStatement.getOffset(),
- showDevicesStatement.isPrefixPath(),
- showDevicesStatement.hasSgCol())
- .planSchemaQueryMerge(false)
- .planOffset(showDevicesStatement.getOffset())
- .planLimit(showDevicesStatement.getLimit())
- .getRoot();
+
+ // If there is only one region, we can push down the offset and limit
operation to
+ // source operator.
+ boolean canPushDownOffsetLimit =
+ analysis.getSchemaPartitionInfo() != null
+ &&
analysis.getSchemaPartitionInfo().getDistributionInfo().size() == 1;
+
+ int limit = showDevicesStatement.getLimit();
+ int offset = showDevicesStatement.getOffset();
+ if (!canPushDownOffsetLimit) {
+ limit = showDevicesStatement.getLimit() +
showDevicesStatement.getOffset();
+ offset = 0;
+ }
+
+ planBuilder =
+ planBuilder
+ .planDeviceSchemaSource(
+ showDevicesStatement.getPathPattern(),
+ limit,
+ offset,
+ showDevicesStatement.isPrefixPath(),
+ showDevicesStatement.hasSgCol())
+ .planSchemaQueryMerge(false);
+
+ if (!canPushDownOffsetLimit) {
+ return planBuilder
+ .planOffset(showDevicesStatement.getOffset())
+ .planLimit(showDevicesStatement.getLimit())
+ .getRoot();
+ }
+ return planBuilder.getRoot();
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 070256c39e..747b9933d1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -177,17 +177,11 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
(deviceGroupId, schemaRegionReplicaSet) ->
schemaRegions.add(schemaRegionReplicaSet));
});
- int count = schemaRegions.size();
schemaRegions.forEach(
region -> {
SchemaQueryScanNode schemaQueryScanNode = (SchemaQueryScanNode)
seed.clone();
schemaQueryScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
schemaQueryScanNode.setRegionReplicaSet(region);
- if (count > 1) {
- schemaQueryScanNode.setLimit(
- schemaQueryScanNode.getOffset() +
schemaQueryScanNode.getLimit());
- schemaQueryScanNode.setOffset(0);
- }
root.addChild(schemaQueryScanNode);
});
return root;
@@ -530,6 +524,7 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
return aggregationNode;
}
+ @Override
public PlanNode visitGroupByLevel(GroupByLevelNode root,
DistributionPlanContext context) {
// Firstly, we build the tree structure for GroupByLevelNode
List<SeriesAggregationSourceNode> sources =
splitAggregationSourceByPartition(root, context);
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
index aa3ba7d7cf..860a7b2c7e 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
@@ -491,8 +491,8 @@ public class LogicalPlannerTest {
Assert.assertFalse(showTimeSeriesNode.isContains());
Assert.assertEquals("tagK", showTimeSeriesNode.getKey());
Assert.assertEquals("tagV", showTimeSeriesNode.getValue());
- Assert.assertEquals(20, showTimeSeriesNode.getLimit());
- Assert.assertEquals(10, showTimeSeriesNode.getOffset());
+ Assert.assertEquals(30, showTimeSeriesNode.getLimit());
+ Assert.assertEquals(0, showTimeSeriesNode.getOffset());
Assert.assertTrue(showTimeSeriesNode.isHasLimit());
// test serialize and deserialize
@@ -509,8 +509,8 @@ public class LogicalPlannerTest {
Assert.assertFalse(showTimeSeriesNode2.isContains());
Assert.assertEquals("tagK", showTimeSeriesNode2.getKey());
Assert.assertEquals("tagV", showTimeSeriesNode2.getValue());
- Assert.assertEquals(20, showTimeSeriesNode2.getLimit());
- Assert.assertEquals(10, showTimeSeriesNode2.getOffset());
+ Assert.assertEquals(30, showTimeSeriesNode2.getLimit());
+ Assert.assertEquals(0, showTimeSeriesNode2.getOffset());
Assert.assertTrue(showTimeSeriesNode2.isHasLimit());
} catch (Exception e) {
e.printStackTrace();
@@ -530,8 +530,8 @@ public class LogicalPlannerTest {
Assert.assertNotNull(showDevicesNode);
Assert.assertEquals(new PartialPath("root.ln.wf01.wt01"),
showDevicesNode.getPath());
Assert.assertTrue(showDevicesNode.isHasSgCol());
- Assert.assertEquals(20, showDevicesNode.getLimit());
- Assert.assertEquals(10, showDevicesNode.getOffset());
+ Assert.assertEquals(30, showDevicesNode.getLimit());
+ Assert.assertEquals(0, showDevicesNode.getOffset());
Assert.assertTrue(showDevicesNode.isHasLimit());
// test serialize and deserialize
@@ -542,8 +542,8 @@ public class LogicalPlannerTest {
(DevicesSchemaScanNode) PlanNodeType.deserialize(byteBuffer);
Assert.assertNotNull(showDevicesNode2);
Assert.assertEquals(new PartialPath("root.ln.wf01.wt01"),
showDevicesNode2.getPath());
- Assert.assertEquals(20, showDevicesNode2.getLimit());
- Assert.assertEquals(10, showDevicesNode2.getOffset());
+ Assert.assertEquals(30, showDevicesNode2.getLimit());
+ Assert.assertEquals(0, showDevicesNode2.getOffset());
Assert.assertTrue(showDevicesNode2.isHasLimit());
} catch (Exception e) {
e.printStackTrace();