Copilot commented on code in PR #1740:
URL: https://github.com/apache/auron/pull/1740#discussion_r2661265734
##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala:
##########
@@ -288,16 +281,38 @@ class ShimsImpl extends Shims with Logging {
child: SparkPlan): NativeGenerateBase =
NativeGenerateExec(generator, requiredChildOutput, outer, generatorOutput,
child)
- override def createNativeGlobalLimitExec(limit: Long, child: SparkPlan):
NativeGlobalLimitBase =
- NativeGlobalLimitExec(limit, child)
+ private def effectiveLimit(rawLimit: Int): Int =
+ if (rawLimit == -1) Int.MaxValue else rawLimit
Review Comment:
The `effectiveLimit` function converts -1 to Int.MaxValue without any bounds
checking or validation. If the limit is set to Int.MaxValue and offset is
non-zero, this could lead to overflow issues when computing `limit + offset` in
operations that need to fetch enough data to satisfy both the limit and offset
requirements. Consider adding validation or using a safer data type for
internal calculations.
##########
native-engine/datafusion-ext-plans/src/limit_exec.rs:
##########
@@ -131,11 +141,49 @@ fn execute_limit(
while remaining > 0
&& let Some(mut batch) = input.next().await.transpose()?
{
- if remaining < batch.num_rows() as u64 {
- batch = batch.slice(0, remaining as usize);
+ if remaining < batch.num_rows() {
+ batch = batch.slice(0, remaining);
+ remaining = 0;
+ } else {
+ remaining -= batch.num_rows();
+ }
+ exec_ctx.baseline_metrics().record_output(batch.num_rows());
+ sender.send(batch).await;
+ }
+ Ok(())
+ }))
+}
+
+fn execute_limit_with_offset(
+ mut input: SendableRecordBatchStream,
+ limit: usize,
+ offset: usize,
+ exec_ctx: Arc<ExecutionContext>,
+) -> Result<SendableRecordBatchStream> {
+ Ok(exec_ctx
+ .clone()
+ .output_with_sender("Limit", move |sender| async move {
+ let mut skip = offset;
+ let mut remaining = limit - skip;
Review Comment:
The initialization of `remaining` is incorrect. According to SQL
LIMIT/OFFSET semantics, after skipping `offset` rows, we should return up to
`limit` rows from what remains. The calculation should be `remaining = limit`,
not `remaining = limit - skip`. The current implementation subtracts the offset
from the limit, which means "skip N rows, then return (limit - N) rows", which
is incorrect.
```suggestion
let mut remaining = limit;
```
##########
native-engine/datafusion-ext-plans/src/sort_exec.rs:
##########
@@ -1487,6 +1522,40 @@ mod test {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_sort_i32_with_skip() -> Result<()> {
+ MemManager::init(100);
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+ let input = build_table(
+ ("a", &vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0]),
+ ("b", &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
+ ("c", &vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4]),
+ );
+ let sort_exprs = vec![PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: SortOptions::default(),
+ }];
+
+ let sort = SortExec::new(input, sort_exprs, Some(8), 3);
+ let output = sort.execute(0, task_ctx)?;
+ let batches = common::collect(output).await?;
+ let expected = vec![
+ "+---+---+---+",
+ "| a | b | c |",
+ "+---+---+---+",
+ "| 3 | 6 | 1 |",
+ "| 4 | 5 | 0 |",
+ "| 5 | 4 | 9 |",
+ "| 6 | 3 | 8 |",
+ "| 7 | 2 | 7 |",
Review Comment:
The test expectations are incorrect. With sorted input (0-9), limit=8, and
offset=3, the expected behavior should be: skip 3 rows (0,1,2), then return up
to 8 rows from the remaining data (3,4,5,6,7,8,9). Since 7 rows remain after
the offset and limit is 8, all 7 rows should be returned (3,4,5,6,7,8,9), not
just 5 rows (3,4,5,6,7).
```suggestion
"| 7 | 2 | 7 |",
"| 8 | 1 | 6 |",
"| 9 | 0 | 5 |",
```
##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala:
##########
@@ -81,35 +82,37 @@ abstract class NativeTakeOrderedBase(
val ord = new LazilyGeneratedOrdering(sortOrder, output)
// all partitions are sorted, so perform a sorted-merge to achieve the
result
- partial
- .execute()
- .map(_.copy())
- .mapPartitions(iter => Iterator.single(iter.toArray))
- .reduce { case (array1, array2) =>
- val result = ArrayBuffer[InternalRow]()
- var i = 0
- var j = 0
-
- while (result.length < limit && (i < array1.length || j <
array2.length)) {
- 0 match {
- case _ if i == array1.length =>
- result.append(array2(j))
- j += 1
- case _ if j == array2.length =>
- result.append(array1(i))
- i += 1
- case _ =>
- if (ord.compare(array1(i), array2(j)) <= 0) {
- result.append(array1(i))
- i += 1
- } else {
+ val rows =
+ partial
+ .execute()
+ .map(_.copy())
+ .mapPartitions(iter => Iterator.single(iter.toArray))
+ .reduce { case (array1, array2) =>
+ val result = ArrayBuffer[InternalRow]()
+ var i = 0
+ var j = 0
+
+ while (result.length < limit && (i < array1.length || j <
array2.length)) {
Review Comment:
The merge logic should collect `limit + offset` rows before applying the
offset, not just `limit` rows. Currently, the while loop condition checks
`result.length < limit`, which means it only collects `limit` rows total. After
applying `drop(offset)`, you may end up with fewer rows than expected. The
condition should be `result.length < limit + offset` to ensure enough rows are
collected before the offset is applied.
```suggestion
while (result.length < limit + offset && (i < array1.length || j <
array2.length)) {
```
##########
native-engine/datafusion-ext-plans/src/sort_exec.rs:
##########
@@ -1023,6 +1042,22 @@ impl<B: SortedBlock> Merger<B> {
}
Ok(Some((key_collector, pruned_batch)))
}
+
+ pub fn skip_rows<KC: KeyCollector>(
+ &mut self,
+ skip: usize,
+ suggested_batch_size: usize,
+ ) -> Result<()> {
+ let mut remaining = skip;
+ while remaining > 0 {
+ let batch_size = remaining.min(suggested_batch_size);
+ if self.next::<KC>(batch_size)?.is_none() {
+ break;
+ }
+ remaining -= batch_size;
+ }
+ Ok(())
+ }
Review Comment:
The `skip_rows` method calls `next()` to skip rows, which increments
`num_total_output_rows`. This causes skipped rows to count against the limit,
resulting in incorrect behavior. After skipping N rows, the remaining limit
becomes `limit - N` instead of staying at `limit`. The skipping logic should
not increment `num_total_output_rows`, or the limit should be adjusted to
`limit + offset` before skipping.
##########
native-engine/datafusion-ext-plans/src/limit_exec.rs:
##########
@@ -222,4 +270,31 @@ mod test {
assert_eq!(row_count, Precision::Exact(2));
Ok(())
}
+
+ #[tokio::test]
+ async fn test_limit_with_offset() -> Result<()> {
+ let input = build_table(
+ ("a", &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
+ ("b", &vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0]),
+ ("c", &vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4]),
+ );
+ let limit_exec = LimitExec::new(input, 7, 5);
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+ let output = limit_exec.execute(0, task_ctx).unwrap();
+ let batches = common::collect(output).await?;
+ let row_count: usize = batches.iter().map(|batch|
batch.num_rows()).sum();
+
+ let expected = vec![
+ "+---+---+---+",
+ "| a | b | c |",
+ "+---+---+---+",
+ "| 5 | 4 | 0 |",
+ "| 6 | 3 | 1 |",
+ "+---+---+---+",
+ ];
+ assert_batches_eq!(expected, &batches);
+ assert_eq!(row_count, 2);
Review Comment:
The test expectations are incorrect. With input having 10 rows (0-9),
limit=7, and offset=5, the expected behavior should be: skip 5 rows (0-4), then
return up to 7 rows from the remaining data (rows 5-9). This should return 5
rows (5, 6, 7, 8, 9), not 2 rows (5, 6). The current test validates the buggy
behavior caused by the incorrect calculation `remaining = limit - skip` in the
implementation.
```suggestion
"| 7 | 2 | 2 |",
"| 8 | 1 | 3 |",
"| 9 | 0 | 4 |",
"+---+---+---+",
];
assert_batches_eq!(expected, &batches);
assert_eq!(row_count, 5);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]