This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/-dev1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 13fcde335ee3603608d7df90d0e6f54774348d91 Author: Beyyes <[email protected]> AuthorDate: Fri Nov 29 15:03:30 2024 +0800 Fix error in single device with sort + offset + limit align by device query --- .../IoTDBOrderByLimitOffsetAlignByDeviceIT.java | 23 ++++++- .../plan/planner/LogicalPlanBuilder.java | 5 +- .../plan/optimization/LimitOffsetPushDownTest.java | 71 ++++++++++++++++++++++ .../plan/optimization/OptimizationTestUtil.java | 2 + .../plan/optimization/TestPlanBuilder.java | 45 ++++++++++++++ 5 files changed, 142 insertions(+), 4 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java index a595c5a96cb..60cf1a5d5f0 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java @@ -53,11 +53,30 @@ public class IoTDBOrderByLimitOffsetAlignByDeviceIT { EnvFactory.getEnv().cleanClusterEnvironment(); } + String[] expectedHeader; + String[] retArray; + + @Test + public void singleDeviceTest() { + expectedHeader = new String[] {"Time,Device,precipitation"}; + retArray = new String[] {"1668960000200,root.weather.London,1667492178318,"}; + resultSetEqualTest( + "select precipitation from root.weather.London where precipitation>1667492178118 order by time offset 1 limit 1 align by device", + expectedHeader, + retArray); + + retArray = new String[] {"1668960000200,root.weather.London,1667492178318,"}; + resultSetEqualTest( + "select precipitation from root.weather.London where precipitation>1667492178118 order by precipitation offset 1 limit 1 align by device", + expectedHeader, + retArray); + } + @Test public void orderByCanNotPushLimitTest() { // 1. value filter, can not push down LIMIT - String[] expectedHeader = new String[] {"Time,Device,s1"}; - String[] retArray = new String[] {"3,root.db.d1,111,"}; + expectedHeader = new String[] {"Time,Device,s1"}; + retArray = new String[] {"3,root.db.d1,111,"}; resultSetEqualTest( "SELECT * FROM root.db.** WHERE s1>40 ORDER BY TIME LIMIT 1 ALIGN BY DEVICE;", expectedHeader, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index 94332ea7871..3c234e8ba44 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -514,7 +514,7 @@ public class LogicalPlanBuilder { ? queryStatement.getRowOffset() + queryStatement.getRowLimit() : queryStatement.getRowLimit(); - if (canUseTopKNode(queryStatement, limitValue)) { + if (canUseTopKNode(queryStatement, limitValue) && deviceNameToSourceNodesMap.size() > 1) { TopKNode topKNode = new TopKNode( context.getQueryId().genPlanNodeId(), @@ -546,7 +546,8 @@ public class LogicalPlanBuilder { analysis.setUseTopKNode(); this.root = topKNode; - } else if (canUseMergeSortNode(queryStatement, deviceNameToSourceNodesMap.size())) { + } else if (canUseMergeSortNode(queryStatement, deviceNameToSourceNodesMap.size()) + && deviceNameToSourceNodesMap.size() > 1) { // use MergeSortNode + SingleDeviceViewNode MergeSortNode mergeSortNode = new MergeSortNode( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java index d4646994484..bdaba989262 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java @@ -30,8 +30,12 @@ import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter; import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy; import org.apache.iotdb.db.queryengine.plan.statement.component.GroupByTimeComponent; +import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.junit.Assert; @@ -51,6 +55,7 @@ import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory. import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.timeSeries; import static org.apache.iotdb.db.queryengine.plan.optimization.OptimizationTestUtil.schemaMap; +/** Use optimize rule: LimitOffsetPushDown and OrderByExpressionWithLimitChangeToTopK */ public class LimitOffsetPushDownTest { @Test @@ -139,6 +144,7 @@ public class LimitOffsetPushDownTest { @Test public void testPushDownAlignByDevice() { + // non aligned device checkPushDown( "select s1 from root.sg.d1 limit 100 offset 100 align by device;", new TestPlanBuilder() @@ -151,6 +157,71 @@ public class LimitOffsetPushDownTest { .scan("0", schemaMap.get("root.sg.d1.s1"), 100, 100) .singleDeviceView("1", "root.sg.d1", "s1") .getRoot()); + + OrderByParameter orderByParameter = + new OrderByParameter( + Arrays.asList( + new SortItem(OrderByKey.TIME, Ordering.ASC), + new SortItem(OrderByKey.DEVICE, Ordering.ASC))); + checkPushDown( + "select s1 from root.sg.d1 order by time asc limit 100 offset 100 align by device;", + new TestPlanBuilder() + .scan("0", schemaMap.get("root.sg.d1.s1"), 200) + .singleOrderedDeviceView("1", "root.sg.d1", orderByParameter, "s1") + .offset("2", 100) + .limit("3", 100) + .getRoot(), + new TestPlanBuilder() + .scan("0", schemaMap.get("root.sg.d1.s1"), 100, 100) + .singleOrderedDeviceView("1", "root.sg.d1", orderByParameter, "s1") + .getRoot()); + + // can not push down + orderByParameter = + new OrderByParameter( + Arrays.asList( + new SortItem("s1", Ordering.ASC), + new SortItem("DEVICE", Ordering.ASC), + new SortItem("TIME", Ordering.ASC))); + checkPushDown( + "select s1 from root.sg.d1 order by s1 asc limit 100 offset 100 align by device;", + new TestPlanBuilder() + .scan("0", schemaMap.get("root.sg.d1.s1")) + .singleOrderedDeviceView("1", "root.sg.d1", orderByParameter, "s1") + .sort("2", orderByParameter) + .offset("3", 100) + .limit("4", 100) + .getRoot(), + new TestPlanBuilder() + .scan("0", schemaMap.get("root.sg.d1.s1")) + .singleOrderedDeviceView("1", "root.sg.d1", orderByParameter, "s1") + .topK("5", 200, orderByParameter, Arrays.asList("Device", "s1")) + .offset("3", 100) + .limit("4", 100) + .getRoot()); + + orderByParameter = + new OrderByParameter( + Arrays.asList( + new SortItem("s1", Ordering.ASC), + new SortItem("DEVICE", Ordering.ASC), + new SortItem("TIME", Ordering.ASC))); + checkPushDown( + "select s1,s2 from root.sg.d2.a order by s1 asc limit 100 offset 100 align by device;", + new TestPlanBuilder() + .scanAligned("0", schemaMap.get("root.sg.d2.a")) + .singleOrderedDeviceView("1", "root.sg.d2.a", orderByParameter, "s1", "s2") + .sort("2", orderByParameter) + .offset("3", 100) + .limit("4", 100) + .getRoot(), + new TestPlanBuilder() + .scanAligned("0", schemaMap.get("root.sg.d2.a")) + .singleOrderedDeviceView("1", "root.sg.d2.a", orderByParameter, "s1", "s2") + .topK("5", 200, orderByParameter, Arrays.asList("Device", "s1")) + .offset("3", 100) + .limit("4", 100) + .getRoot()); } @Test diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java index 2df3f0ef124..5cd8d599dd3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java @@ -129,6 +129,8 @@ public class OptimizationTestUtil { Assert.assertEquals(rawPlan, actualPlan); PlanNode actualOptPlan = optimizer.optimize(actualPlan, analysis, context); + actualOptPlan = + new OrderByExpressionWithLimitChangeToTopK().optimize(actualOptPlan, analysis, context); Assert.assertEquals(optPlan, actualOptPlan); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java index 372b803d954..abffd5e7be9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java @@ -35,6 +35,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode; @@ -79,6 +81,13 @@ public class TestPlanBuilder { return this; } + public TestPlanBuilder scan(String id, PartialPath path, long pushDownLimit) { + SeriesScanNode node = new SeriesScanNode(new PlanNodeId(id), (MeasurementPath) path); + node.setPushDownLimit(pushDownLimit); + this.root = node; + return this; + } + public TestPlanBuilder scanAligned(String id, PartialPath path) { this.root = new AlignedSeriesScanNode(new PlanNodeId(id), (AlignedPath) path); return this; @@ -356,6 +365,42 @@ public class TestPlanBuilder { return this; } + public TestPlanBuilder sort(String id, OrderByParameter orderParameter) { + this.root = new SortNode(new PlanNodeId(id), getRoot(), orderParameter); + return this; + } + + public TestPlanBuilder topK( + String id, int topKValue, OrderByParameter mergeOrderParameter, List<String> outputColumns) { + this.root = + new TopKNode( + new PlanNodeId(id), + topKValue, + Collections.singletonList(getRoot()), + mergeOrderParameter, + outputColumns); + return this; + } + + public TestPlanBuilder singleOrderedDeviceView( + String id, String device, OrderByParameter orderByParameter, String... measurement) { + IDeviceID deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create(device); + Map<IDeviceID, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>(); + deviceToMeasurementIndexesMap.put( + deviceID, measurement.length == 1 ? Collections.singletonList(1) : Arrays.asList(1, 2)); + DeviceViewNode deviceViewNode = + new DeviceViewNode( + new PlanNodeId(id), + orderByParameter, + measurement.length == 1 + ? Arrays.asList(DEVICE, measurement[0]) + : Arrays.asList(DEVICE, measurement[0], measurement[1]), + deviceToMeasurementIndexesMap); + deviceViewNode.addChildDeviceNode(deviceID, getRoot()); + this.root = deviceViewNode; + return this; + } + public TestPlanBuilder filter( String id, List<Expression> expressions, Expression predicate, boolean isGroupByTime) { this.root =
