This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch EnhancedDeviceCrossRegionIT
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/EnhancedDeviceCrossRegionIT by
this push:
new 2a91621d0ef Consider cross region when process streaming property of
Aggregation in DistributionPlan
2a91621d0ef is described below
commit 2a91621d0ef157f29807d2370ff5483fac95cf97
Author: Weihao Li <[email protected]>
AuthorDate: Tue Sep 2 18:06:07 2025 +0800
Consider cross region when process streaming property of Aggregation in
DistributionPlan
---
.../relational/it/db/it/IoTDBWindowTVFIT.java | 4 ++--
.../it/query/recent/IoTDBTableAggregationIT.java | 22 +++++++++++-----------
.../distribute/TableDistributedPlanGenerator.java | 19 +++++++++++++++++--
3 files changed, 30 insertions(+), 15 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
index f5b4a9787f2..5a25667003d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
@@ -252,7 +252,7 @@ public class IoTDBWindowTVFIT {
"3,2021-01-01T09:05:00.000Z,device1,3,"
};
tableResultSetEqualTest(
- "SELECT window_index, time, device_id, int_val FROM
variation(multi_type, 'int_val', 1.0, false)",
+ "SELECT window_index, time, device_id, int_val FROM
variation(multi_type, 'int_val', 1.0, false) order by window_index, time",
expectedHeader,
retArray,
DATABASE_NAME);
@@ -551,7 +551,7 @@ public class IoTDBWindowTVFIT {
"2021-01-01T09:12:00.000Z,2021-01-01T09:24:00.000Z,TESL,195.0,",
};
tableResultSetEqualTest(
- "SELECT window_start, window_end, stock_id, sum(price) as sum FROM
CUMULATE(DATA => bid, TIMECOL => 'time', STEP => 6m, SIZE => 12m) GROUP BY
window_start, window_end, stock_id ORDER BY stock_id, window_start",
+ "SELECT window_start, window_end, stock_id, sum(price) as sum FROM
CUMULATE(DATA => bid, TIMECOL => 'time', STEP => 6m, SIZE => 12m) GROUP BY
window_start, window_end, stock_id ORDER BY stock_id, window_start, window_end",
expectedHeader,
retArray,
DATABASE_NAME);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
index 7da3bb8ebdd..f60777e2b0a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
@@ -158,12 +158,12 @@ public class IoTDBTableAggregationIT {
"2024-09-24T06:15:55.000Z,2024-09-24T06:16:00.000Z,d01,1,",
};
tableResultSetEqualTest(
- "select date_bin(5s, time), (date_bin(5s, time) + 5000) as end_time,
device_id, count(*) from table1 where device_id = 'd01' group by 1,device_id",
+ "select date_bin(5s, time), (date_bin(5s, time) + 5000) as end_time,
device_id, count(*) from table1 where device_id = 'd01' group by 1,device_id
order by 1",
expectedHeader,
retArray,
DATABASE_NAME);
tableResultSetEqualTest(
- "select date_bin(5s, time), (date_bin(5s, time) + 5000) as end_time,
device_id, count(1) from table1 where device_id = 'd01' group by 1,device_id",
+ "select date_bin(5s, time), (date_bin(5s, time) + 5000) as end_time,
device_id, count(1) from table1 where device_id = 'd01' group by 1,device_id
order by 1",
expectedHeader,
retArray,
DATABASE_NAME);
@@ -558,7 +558,7 @@ public class IoTDBTableAggregationIT {
"2024-09-24T06:15:55.000Z,2024-09-24T06:16:00.000Z,d16,0,",
};
tableResultSetEqualTest(
- "select date_bin(5s, time), (date_bin(5s, time) + 5000) as end_time,
device_id, count_if(device_id = 'd01') from table1 group by 1,device_id",
+ "select date_bin(5s, time), (date_bin(5s, time) + 5000) as end_time,
device_id, count_if(device_id = 'd01') from table1 group by 1,device_id order
by device_id, 1",
expectedHeader,
retArray,
DATABASE_NAME);
@@ -836,7 +836,7 @@ public class IoTDBTableAggregationIT {
"2024-09-24T06:15:55.000Z,d01,null,",
};
tableResultSetEqualTest(
- "select date_bin(5s, time), device_id, avg(s3) from table1 where
device_id = 'd01' group by 1, 2",
+ "select date_bin(5s, time), device_id, avg(s3) from table1 where
device_id = 'd01' group by 1, 2 order by 1",
expectedHeader,
retArray,
DATABASE_NAME);
@@ -1016,7 +1016,7 @@ public class IoTDBTableAggregationIT {
"2024-09-24T06:15:55.000Z,d01,null,",
};
tableResultSetEqualTest(
- "select date_bin(5s, time), device_id, sum(s3) from table1 where
device_id = 'd01' group by 1, 2",
+ "select date_bin(5s, time), device_id, sum(s3) from table1 where
device_id = 'd01' group by 1, 2 order by 1",
expectedHeader,
retArray,
DATABASE_NAME);
@@ -1940,7 +1940,7 @@ public class IoTDBTableAggregationIT {
};
tableResultSetEqualTest(
- "select device_id, date_bin(5s, time), max_by(time, s3), max(s3) from
table1 where device_id = 'd01' group by date_bin(5s, time), 1",
+ "select device_id, date_bin(5s, time), max_by(time, s3), max(s3) from
table1 where device_id = 'd01' group by date_bin(5s, time), 1 order by 2",
expectedHeader,
retArray,
DATABASE_NAME);
@@ -2172,7 +2172,7 @@ public class IoTDBTableAggregationIT {
"2024-09-24T06:15:55.000Z,d01,2024-09-24T06:15:55.000Z,55,null,null,55.0,null,null,null,0xcafebabe55,2024-09-24T06:15:55.000Z,null,",
};
tableResultSetEqualTest(
- "select date_bin(5s, time), device_id,
first(time),first(s1),first(s2),first(s3),first(s4),first(s5),first(s6),first(s7),first(s8),first(s9),first(s10)
from table1 where device_id = 'd01' group by 1,2",
+ "select date_bin(5s, time), device_id,
first(time),first(s1),first(s2),first(s3),first(s4),first(s5),first(s6),first(s7),first(s8),first(s9),first(s10)
from table1 where device_id = 'd01' group by 1,2 order by 1",
expectedHeader,
retArray,
DATABASE_NAME);
@@ -2347,7 +2347,7 @@ public class IoTDBTableAggregationIT {
"2024-09-24T06:15:55.000Z,d01,2024-09-24T06:15:55.000Z,55.0,",
};
tableResultSetEqualTest(
- "select date_bin(5s, time), device_id, first_by(time, s4), first(s4)
from table1 where device_id = 'd01' group by 1,2",
+ "select date_bin(5s, time), device_id, first_by(time, s4), first(s4)
from table1 where device_id = 'd01' group by 1,2 order by 1",
expectedHeader,
retArray,
DATABASE_NAME);
@@ -2539,7 +2539,7 @@ public class IoTDBTableAggregationIT {
"2024-09-24T06:15:55.000Z,d01,2024-09-24T06:15:55.000Z,55,null,null,55.0,null,null,null,0xcafebabe55,2024-09-24T06:15:55.000Z,null,",
};
tableResultSetEqualTest(
- "select date_bin(5s, time), device_id,
last(time),last(s1),last(s2),last(s3),last(s4),last(s5),last(s6),last(s7),last(s8),last(s9),last(s10)
from table1 where device_id = 'd01' group by 1,2",
+ "select date_bin(5s, time), device_id,
last(time),last(s1),last(s2),last(s3),last(s4),last(s5),last(s6),last(s7),last(s8),last(s9),last(s10)
from table1 where device_id = 'd01' group by 1,2 order by 1",
expectedHeader,
retArray,
DATABASE_NAME);
@@ -2717,7 +2717,7 @@ public class IoTDBTableAggregationIT {
"2024-09-24T06:15:55.000Z,d01,2024-09-24T06:15:55.000Z,55.0,",
};
repeatTest(
- "select date_bin(5s, time), device_id, last_by(time, s4), last(s4)
from table1 where device_id = 'd01' group by 1,2",
+ "select date_bin(5s, time), device_id, last_by(time, s4), last(s4)
from table1 where device_id = 'd01' group by 1,2 order by 1",
expectedHeader,
retArray,
DATABASE_NAME,
@@ -2883,7 +2883,7 @@ public class IoTDBTableAggregationIT {
"2024-09-24T06:15:55.000Z,d01,null,",
};
tableResultSetEqualTest(
- "select date_bin(5s, time), device_id,extreme(s3) from table1 where
device_id = 'd01' group by 1, 2",
+ "select date_bin(5s, time), device_id,extreme(s3) from table1 where
device_id = 'd01' group by 1, 2 order by 1",
expectedHeader,
retArray,
DATABASE_NAME);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 5d52e6c05ec..fb69e66f0db 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -710,6 +710,7 @@ public class TableDistributedPlanGenerator
regionReplicaSet,
regionDeviceCount.getOrDefault(regionReplicaSet, 0) + 1));
if (regionReplicaSets.size() != 1) {
crossRegionDevices.add(deviceEntry);
+ context.deviceCrossRegion = true;
continue;
}
final DeviceTableScanNode deviceTableScanNode =
@@ -794,6 +795,9 @@ public class TableDistributedPlanGenerator
deviceEntry.getDeviceID(),
node.getTimeFilter(),
cachedSeriesSlotWithRegions);
+ if (regionReplicaSets.size() > 1) {
+ context.deviceCrossRegion = true;
+ }
for (final TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
final DeviceTableScanNode deviceTableScanNode =
tableScanNodeMap.computeIfAbsent(
@@ -882,6 +886,9 @@ public class TableDistributedPlanGenerator
node.getTimeFilter(),
cachedSeriesSlotWithRegions);
+ if (regionReplicaSets.size() > 1) {
+ context.deviceCrossRegion = true;
+ }
for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
boolean aligned = deviceEntry instanceof AlignedDeviceEntry;
Pair<TreeAlignedDeviceViewScanNode, TreeNonAlignedDeviceViewScanNode>
pair =
@@ -1034,9 +1041,15 @@ public class TableDistributedPlanGenerator
"Should never reach here. Child ordering: %s.
PreGroupedSymbols: %s",
childOrdering.getOrderBy(), node.getPreGroupedSymbols()));
}
+ } else if (context.deviceCrossRegion) {
+ // Child has no Ordering and the device cross region, the grouped
property of child is not
+ // ensured, so we need to clear the attribute of AggNode
+ node.setPreGroupedSymbols(ImmutableList.of());
+ context.deviceCrossRegion = false;
}
- // Child has no Ordering, do nothing here because the logical optimizer
- // 'TransformAggregationToStreamable' will ensure the grouped property
of child
+ // Child has no Ordering and the device doesn't cross region, do nothing
here because the
+ // logical optimizer 'TransformAggregationToStreamable' will ensure the
grouped property of
+ // child
}
if (childrenNodes.size() == 1) {
@@ -1132,6 +1145,7 @@ public class TableDistributedPlanGenerator
cachedSeriesSlotWithRegions);
if (regionReplicaSets.size() > 1) {
needSplit = true;
+ context.deviceCrossRegion = true;
}
regionReplicaSetsList.add(regionReplicaSets);
}
@@ -1772,6 +1786,7 @@ public class TableDistributedPlanGenerator
boolean pushDownGrouping = false;
OrderingScheme expectedOrderingScheme;
TRegionReplicaSet mostUsedRegion;
+ boolean deviceCrossRegion;
public PlanContext() {
this.nodeDistributionMap = new HashMap<>();