This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new ab7a5cd57 fix: Handle case where num_cols == 0 in native execution (#1840) ab7a5cd57 is described below commit ab7a5cd57fd710e0461066f681ca82382bcb3245 Author: Andy Grove <agr...@apache.org> AuthorDate: Wed Jun 4 11:21:56 2025 -0600 fix: Handle case where num_cols == 0 in native execution (#1840) --- native/core/src/execution/jni_api.rs | 46 ++++++++++++---------- .../org/apache/comet/exec/CometExecSuite.scala | 26 ++++++++++++ 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index b371f6be7..41be8b0a9 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -310,31 +310,35 @@ fn prepare_output( let results = output_batch.columns(); let num_rows = output_batch.num_rows(); - if results.len() != num_cols { - return Err(CometError::Internal(format!( - "Output column count mismatch: expected {num_cols}, got {}", - results.len() - ))); - } + // there are edge cases where num_cols can be zero due to Spark optimizations + // when the results of a query are not used + if num_cols > 0 { + if results.len() != num_cols { + return Err(CometError::Internal(format!( + "Output column count mismatch: expected {num_cols}, got {}", + results.len() + ))); + } - if validate { - // Validate the output arrays. - for array in results.iter() { - let array_data = array.to_data(); - array_data - .validate_full() - .expect("Invalid output array data"); + if validate { + // Validate the output arrays. + for array in results.iter() { + let array_data = array.to_data(); + array_data + .validate_full() + .expect("Invalid output array data"); + } } - } - let mut i = 0; - while i < results.len() { - let array_ref = results.get(i).ok_or(CometError::IndexOutOfBounds(i))?; - array_ref - .to_data() - .move_to_spark(array_addrs[i], schema_addrs[i])?; + let mut i = 0; + while i < results.len() { + let array_ref = results.get(i).ok_or(CometError::IndexOutOfBounds(i))?; + array_ref + .to_data() + .move_to_spark(array_addrs[i], schema_addrs[i])?; - i += 1; + i += 1; + } } Ok(num_rows as jlong) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 28a369ead..26cae2f8e 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -139,6 +139,32 @@ class CometExecSuite extends CometTestBase { } } + // repro for https://github.com/apache/datafusion-comet/issues/1251 + test("subquery/exists-subquery/exists-orderby-limit.sql") { + withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { + val table = "src" + withTable(table) { + sql(s"CREATE TABLE $table (key INT, value STRING) USING PARQUET") + sql(s"INSERT INTO $table VALUES(238, 'val_238')") + + // the subquery returns the distinct group by values + checkSparkAnswerAndOperator(s"""SELECT * FROM $table + |WHERE EXISTS (SELECT MAX(key) + |FROM $table + |GROUP BY value + |LIMIT 1 + |OFFSET 2)""".stripMargin) + + checkSparkAnswerAndOperator(s"""SELECT * FROM $table + |WHERE NOT EXISTS (SELECT MAX(key) + |FROM $table + |GROUP BY value + |LIMIT 1 + |OFFSET 2)""".stripMargin) + } + } + } + test("Sort on single struct should fallback to Spark") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org