This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch fixLastQueryScanNodeSort in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d6ef8cbf482e7fd5d8f3378698385d52cb79f336 Author: shuwenwei <[email protected]> AuthorDate: Fri Dec 26 11:43:24 2025 +0800 fix LastQueryScanNode sort --- .../plan/planner/distribution/SourceRewriter.java | 19 ++++---- .../plan/node/source/LastQueryScanNode.java | 6 ++- .../plan/planner/distribution/LastQueryTest.java | 57 ++++++++++++++++++++++ 3 files changed, 72 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java index 79489012210..718c1c469ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java @@ -1086,15 +1086,16 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> if (child instanceof LastQueryScanNode) { // sort the measurements for LastQueryMergeOperator LastQueryScanNode node = (LastQueryScanNode) child; - ((LastQueryScanNode) child) - .getIdxOfMeasurementSchemas() - .sort( - Comparator.comparing( - idx -> - new Binary( - node.getMeasurementSchema(idx).getMeasurementName(), - TSFileConfig.STRING_CHARSET), - Comparator.naturalOrder())); + List<Integer> sorted = + new ArrayList<>(((LastQueryScanNode) child).getIdxOfMeasurementSchemas()); + sorted.sort( + Comparator.comparing( + idx -> + new Binary( + node.getMeasurementSchema(idx).getMeasurementName(), + TSFileConfig.STRING_CHARSET), + Comparator.naturalOrder())); + node.setIndexOfMeasurementSchemas(sorted); } }); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java index 74f53791382..24d9970386f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java @@ -59,7 +59,7 @@ public class LastQueryScanNode extends LastSeriesSourceNode { private final PartialPath devicePath; private final boolean aligned; - private final List<Integer> indexOfMeasurementSchemas; + private List<Integer> indexOfMeasurementSchemas; // This structure does not need to be serialized or deserialized. // It will be set when the current Node is added to the child by the upper LastQueryNode. private List<IMeasurementSchema> globalMeasurementSchemaList; @@ -367,6 +367,10 @@ public class LastQueryScanNode extends LastSeriesSourceNode { return indexOfMeasurementSchemas; } + public void setIndexOfMeasurementSchemas(List<Integer> indexOfMeasurementSchemas) { + this.indexOfMeasurementSchemas = indexOfMeasurementSchemas; + } + public List<IMeasurementSchema> getMeasurementSchemas() { return indexOfMeasurementSchemas.stream() .map(globalMeasurementSchemaList::get) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java index 86eb189680f..b077b38eb3e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java @@ -22,16 +22,22 @@ package org.apache.iotdb.db.queryengine.plan.planner.distribution; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Test; @@ -41,6 +47,57 @@ import java.util.List; public class LastQueryTest { + @Test + public void testSortLastQueryScanNode() throws IllegalPathException { + LastQueryNode lastQueryNode = new LastQueryNode(new PlanNodeId("test"), null, true); + + lastQueryNode.addDeviceLastQueryScanNode( + new PlanNodeId("test_last_query_scan1"), + new PartialPath("root.test.d1"), + true, + Arrays.asList( + new MeasurementSchema("s3", TSDataType.INT32), + new MeasurementSchema("s1", TSDataType.BOOLEAN), + new MeasurementSchema("s2", TSDataType.INT32)), + null, + null); + lastQueryNode.addDeviceLastQueryScanNode( + new PlanNodeId("test_last_query_scan2"), + new PartialPath("root.test.d0"), + false, + Collections.singletonList(new MeasurementSchema("s0", TSDataType.BOOLEAN)), + null, + null); + + Analysis analysis = Util.constructAnalysis(); + SourceRewriter sourceRewriter = new SourceRewriter(analysis); + DistributionPlanContext context = + new DistributionPlanContext( + new MPPQueryContext("", new QueryId("test"), null, new TEndPoint(), new TEndPoint())); + context.setOneSeriesInMultiRegion(true); + context.setQueryMultiRegion(true); + List<PlanNode> result = sourceRewriter.visitLastQuery(lastQueryNode, context); + Assert.assertEquals(1, result.size()); + Assert.assertTrue(result.get(0) instanceof LastQueryMergeNode); + LastQueryMergeNode mergeNode = (LastQueryMergeNode) result.get(0); + Assert.assertEquals(1, mergeNode.getChildren().size()); + Assert.assertTrue(mergeNode.getChildren().get(0) instanceof LastQueryNode); + + LastQueryNode lastQueryNode2 = (LastQueryNode) mergeNode.getChildren().get(0); + Assert.assertEquals(2, lastQueryNode2.getChildren().size()); + Assert.assertTrue(lastQueryNode2.getChildren().get(0) instanceof LastQueryScanNode); + + LastQueryScanNode scanNodeChild1 = (LastQueryScanNode) lastQueryNode2.getChildren().get(0); + Assert.assertTrue(scanNodeChild1.getDevicePath().toString().contains("d0")); + Assert.assertEquals("s0", scanNodeChild1.getMeasurementSchemas().get(0).getMeasurementName()); + + LastQueryScanNode scanNodeChild2 = (LastQueryScanNode) lastQueryNode2.getChildren().get(1); + Assert.assertTrue(scanNodeChild2.getDevicePath().toString().contains("d1")); + Assert.assertEquals("s1", scanNodeChild2.getMeasurementSchemas().get(0).getMeasurementName()); + Assert.assertEquals("s2", scanNodeChild2.getMeasurementSchemas().get(1).getMeasurementName()); + Assert.assertEquals("s3", scanNodeChild2.getMeasurementSchemas().get(2).getMeasurementName()); + } + @Test public void testLastQuery1Series1Region() throws IllegalPathException { String d2s1Path = "root.sg.d22.s1";
