This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch EnhancedDeviceCrossRegionIT
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/EnhancedDeviceCrossRegionIT by 
this push:
     new 2a91621d0ef Consider cross region when process streaming property of 
Aggregation in DistributionPlan
2a91621d0ef is described below

commit 2a91621d0ef157f29807d2370ff5483fac95cf97
Author: Weihao Li <[email protected]>
AuthorDate: Tue Sep 2 18:06:07 2025 +0800

    Consider cross region when process streaming property of Aggregation in 
DistributionPlan
---
 .../relational/it/db/it/IoTDBWindowTVFIT.java      |  4 ++--
 .../it/query/recent/IoTDBTableAggregationIT.java   | 22 +++++++++++-----------
 .../distribute/TableDistributedPlanGenerator.java  | 19 +++++++++++++++++--
 3 files changed, 30 insertions(+), 15 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
index f5b4a9787f2..5a25667003d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
@@ -252,7 +252,7 @@ public class IoTDBWindowTVFIT {
           "3,2021-01-01T09:05:00.000Z,device1,3,"
         };
     tableResultSetEqualTest(
-        "SELECT window_index, time, device_id, int_val FROM 
variation(multi_type, 'int_val', 1.0, false)",
+        "SELECT window_index, time, device_id, int_val FROM 
variation(multi_type, 'int_val', 1.0, false) order by window_index, time",
         expectedHeader,
         retArray,
         DATABASE_NAME);
@@ -551,7 +551,7 @@ public class IoTDBWindowTVFIT {
           "2021-01-01T09:12:00.000Z,2021-01-01T09:24:00.000Z,TESL,195.0,",
         };
     tableResultSetEqualTest(
-        "SELECT window_start, window_end, stock_id, sum(price) as sum FROM 
CUMULATE(DATA => bid, TIMECOL => 'time', STEP => 6m, SIZE => 12m) GROUP BY 
window_start, window_end, stock_id ORDER BY stock_id, window_start",
+        "SELECT window_start, window_end, stock_id, sum(price) as sum FROM 
CUMULATE(DATA => bid, TIMECOL => 'time', STEP => 6m, SIZE => 12m) GROUP BY 
window_start, window_end, stock_id ORDER BY stock_id, window_start, window_end",
         expectedHeader,
         retArray,
         DATABASE_NAME);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
index 7da3bb8ebdd..f60777e2b0a 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
@@ -158,12 +158,12 @@ public class IoTDBTableAggregationIT {
           "2024-09-24T06:15:55.000Z,2024-09-24T06:16:00.000Z,d01,1,",
         };
     tableResultSetEqualTest(
-        "select date_bin(5s, time), (date_bin(5s, time) + 5000) as end_time, 
device_id, count(*) from table1 where device_id = 'd01' group by 1,device_id",
+        "select date_bin(5s, time), (date_bin(5s, time) + 5000) as end_time, 
device_id, count(*) from table1 where device_id = 'd01' group by 1,device_id 
order by 1",
         expectedHeader,
         retArray,
         DATABASE_NAME);
     tableResultSetEqualTest(
-        "select date_bin(5s, time), (date_bin(5s, time) + 5000) as end_time, 
device_id, count(1) from table1 where device_id = 'd01' group by 1,device_id",
+        "select date_bin(5s, time), (date_bin(5s, time) + 5000) as end_time, 
device_id, count(1) from table1 where device_id = 'd01' group by 1,device_id 
order by 1",
         expectedHeader,
         retArray,
         DATABASE_NAME);
@@ -558,7 +558,7 @@ public class IoTDBTableAggregationIT {
           "2024-09-24T06:15:55.000Z,2024-09-24T06:16:00.000Z,d16,0,",
         };
     tableResultSetEqualTest(
-        "select date_bin(5s, time), (date_bin(5s, time) + 5000) as end_time, 
device_id, count_if(device_id = 'd01') from table1 group by 1,device_id",
+        "select date_bin(5s, time), (date_bin(5s, time) + 5000) as end_time, 
device_id, count_if(device_id = 'd01') from table1 group by 1,device_id order 
by device_id, 1",
         expectedHeader,
         retArray,
         DATABASE_NAME);
@@ -836,7 +836,7 @@ public class IoTDBTableAggregationIT {
           "2024-09-24T06:15:55.000Z,d01,null,",
         };
     tableResultSetEqualTest(
-        "select date_bin(5s, time), device_id, avg(s3) from table1 where 
device_id = 'd01' group by 1, 2",
+        "select date_bin(5s, time), device_id, avg(s3) from table1 where 
device_id = 'd01' group by 1, 2 order by 1",
         expectedHeader,
         retArray,
         DATABASE_NAME);
@@ -1016,7 +1016,7 @@ public class IoTDBTableAggregationIT {
           "2024-09-24T06:15:55.000Z,d01,null,",
         };
     tableResultSetEqualTest(
-        "select date_bin(5s, time), device_id, sum(s3) from table1 where 
device_id = 'd01' group by 1, 2",
+        "select date_bin(5s, time), device_id, sum(s3) from table1 where 
device_id = 'd01' group by 1, 2 order by 1",
         expectedHeader,
         retArray,
         DATABASE_NAME);
@@ -1940,7 +1940,7 @@ public class IoTDBTableAggregationIT {
         };
 
     tableResultSetEqualTest(
-        "select device_id, date_bin(5s, time), max_by(time, s3), max(s3) from 
table1 where device_id = 'd01' group by date_bin(5s, time), 1",
+        "select device_id, date_bin(5s, time), max_by(time, s3), max(s3) from 
table1 where device_id = 'd01' group by date_bin(5s, time), 1 order by 2",
         expectedHeader,
         retArray,
         DATABASE_NAME);
@@ -2172,7 +2172,7 @@ public class IoTDBTableAggregationIT {
           
"2024-09-24T06:15:55.000Z,d01,2024-09-24T06:15:55.000Z,55,null,null,55.0,null,null,null,0xcafebabe55,2024-09-24T06:15:55.000Z,null,",
         };
     tableResultSetEqualTest(
-        "select date_bin(5s, time), device_id, 
first(time),first(s1),first(s2),first(s3),first(s4),first(s5),first(s6),first(s7),first(s8),first(s9),first(s10)
 from table1 where device_id = 'd01' group by 1,2",
+        "select date_bin(5s, time), device_id, 
first(time),first(s1),first(s2),first(s3),first(s4),first(s5),first(s6),first(s7),first(s8),first(s9),first(s10)
 from table1 where device_id = 'd01' group by 1,2 order by 1",
         expectedHeader,
         retArray,
         DATABASE_NAME);
@@ -2347,7 +2347,7 @@ public class IoTDBTableAggregationIT {
           "2024-09-24T06:15:55.000Z,d01,2024-09-24T06:15:55.000Z,55.0,",
         };
     tableResultSetEqualTest(
-        "select date_bin(5s, time), device_id, first_by(time, s4), first(s4) 
from table1 where device_id = 'd01' group by 1,2",
+        "select date_bin(5s, time), device_id, first_by(time, s4), first(s4) 
from table1 where device_id = 'd01' group by 1,2 order by 1",
         expectedHeader,
         retArray,
         DATABASE_NAME);
@@ -2539,7 +2539,7 @@ public class IoTDBTableAggregationIT {
           
"2024-09-24T06:15:55.000Z,d01,2024-09-24T06:15:55.000Z,55,null,null,55.0,null,null,null,0xcafebabe55,2024-09-24T06:15:55.000Z,null,",
         };
     tableResultSetEqualTest(
-        "select date_bin(5s, time), device_id, 
last(time),last(s1),last(s2),last(s3),last(s4),last(s5),last(s6),last(s7),last(s8),last(s9),last(s10)
 from table1 where device_id = 'd01' group by 1,2",
+        "select date_bin(5s, time), device_id, 
last(time),last(s1),last(s2),last(s3),last(s4),last(s5),last(s6),last(s7),last(s8),last(s9),last(s10)
 from table1 where device_id = 'd01' group by 1,2 order by 1",
         expectedHeader,
         retArray,
         DATABASE_NAME);
@@ -2717,7 +2717,7 @@ public class IoTDBTableAggregationIT {
           "2024-09-24T06:15:55.000Z,d01,2024-09-24T06:15:55.000Z,55.0,",
         };
     repeatTest(
-        "select date_bin(5s, time), device_id, last_by(time, s4), last(s4) 
from table1 where device_id = 'd01' group by 1,2",
+        "select date_bin(5s, time), device_id, last_by(time, s4), last(s4) 
from table1 where device_id = 'd01' group by 1,2 order by 1",
         expectedHeader,
         retArray,
         DATABASE_NAME,
@@ -2883,7 +2883,7 @@ public class IoTDBTableAggregationIT {
           "2024-09-24T06:15:55.000Z,d01,null,",
         };
     tableResultSetEqualTest(
-        "select date_bin(5s, time), device_id,extreme(s3) from table1 where 
device_id = 'd01' group by 1, 2",
+        "select date_bin(5s, time), device_id,extreme(s3) from table1 where 
device_id = 'd01' group by 1, 2 order by 1",
         expectedHeader,
         retArray,
         DATABASE_NAME);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 5d52e6c05ec..fb69e66f0db 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -710,6 +710,7 @@ public class TableDistributedPlanGenerator
                   regionReplicaSet, 
regionDeviceCount.getOrDefault(regionReplicaSet, 0) + 1));
       if (regionReplicaSets.size() != 1) {
         crossRegionDevices.add(deviceEntry);
+        context.deviceCrossRegion = true;
         continue;
       }
       final DeviceTableScanNode deviceTableScanNode =
@@ -794,6 +795,9 @@ public class TableDistributedPlanGenerator
               deviceEntry.getDeviceID(),
               node.getTimeFilter(),
               cachedSeriesSlotWithRegions);
+      if (regionReplicaSets.size() > 1) {
+        context.deviceCrossRegion = true;
+      }
       for (final TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
         final DeviceTableScanNode deviceTableScanNode =
             tableScanNodeMap.computeIfAbsent(
@@ -882,6 +886,9 @@ public class TableDistributedPlanGenerator
               node.getTimeFilter(),
               cachedSeriesSlotWithRegions);
 
+      if (regionReplicaSets.size() > 1) {
+        context.deviceCrossRegion = true;
+      }
       for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
         boolean aligned = deviceEntry instanceof AlignedDeviceEntry;
         Pair<TreeAlignedDeviceViewScanNode, TreeNonAlignedDeviceViewScanNode> 
pair =
@@ -1034,9 +1041,15 @@ public class TableDistributedPlanGenerator
                   "Should never reach here. Child ordering: %s. 
PreGroupedSymbols: %s",
                   childOrdering.getOrderBy(), node.getPreGroupedSymbols()));
         }
+      } else if (context.deviceCrossRegion) {
+        // Child has no Ordering and the device cross region, the grouped 
property of child is not
+        // ensured, so we need to clear the attribute of AggNode
+        node.setPreGroupedSymbols(ImmutableList.of());
+        context.deviceCrossRegion = false;
       }
-      // Child has no Ordering, do nothing here because the logical optimizer
-      // 'TransformAggregationToStreamable' will ensure the grouped property 
of child
+      // Child has no Ordering and the device doesn't cross region, do nothing 
here because the
+      // logical optimizer 'TransformAggregationToStreamable' will ensure the 
grouped property of
+      // child
     }
 
     if (childrenNodes.size() == 1) {
@@ -1132,6 +1145,7 @@ public class TableDistributedPlanGenerator
                 cachedSeriesSlotWithRegions);
         if (regionReplicaSets.size() > 1) {
           needSplit = true;
+          context.deviceCrossRegion = true;
         }
         regionReplicaSetsList.add(regionReplicaSets);
       }
@@ -1772,6 +1786,7 @@ public class TableDistributedPlanGenerator
     boolean pushDownGrouping = false;
     OrderingScheme expectedOrderingScheme;
     TRegionReplicaSet mostUsedRegion;
+    boolean deviceCrossRegion;
 
     public PlanContext() {
       this.nodeDistributionMap = new HashMap<>();

Reply via email to