This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new ab7a5cd57 fix: Handle case where num_cols == 0 in native execution 
(#1840)
ab7a5cd57 is described below

commit ab7a5cd57fd710e0461066f681ca82382bcb3245
Author: Andy Grove <agr...@apache.org>
AuthorDate: Wed Jun 4 11:21:56 2025 -0600

    fix: Handle case where num_cols == 0 in native execution (#1840)
---
 native/core/src/execution/jni_api.rs               | 46 ++++++++++++----------
 .../org/apache/comet/exec/CometExecSuite.scala     | 26 ++++++++++++
 2 files changed, 51 insertions(+), 21 deletions(-)

diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index b371f6be7..41be8b0a9 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -310,31 +310,35 @@ fn prepare_output(
     let results = output_batch.columns();
     let num_rows = output_batch.num_rows();
 
-    if results.len() != num_cols {
-        return Err(CometError::Internal(format!(
-            "Output column count mismatch: expected {num_cols}, got {}",
-            results.len()
-        )));
-    }
+    // there are edge cases where num_cols can be zero due to Spark 
optimizations
+    // when the results of a query are not used
+    if num_cols > 0 {
+        if results.len() != num_cols {
+            return Err(CometError::Internal(format!(
+                "Output column count mismatch: expected {num_cols}, got {}",
+                results.len()
+            )));
+        }
 
-    if validate {
-        // Validate the output arrays.
-        for array in results.iter() {
-            let array_data = array.to_data();
-            array_data
-                .validate_full()
-                .expect("Invalid output array data");
+        if validate {
+            // Validate the output arrays.
+            for array in results.iter() {
+                let array_data = array.to_data();
+                array_data
+                    .validate_full()
+                    .expect("Invalid output array data");
+            }
         }
-    }
 
-    let mut i = 0;
-    while i < results.len() {
-        let array_ref = results.get(i).ok_or(CometError::IndexOutOfBounds(i))?;
-        array_ref
-            .to_data()
-            .move_to_spark(array_addrs[i], schema_addrs[i])?;
+        let mut i = 0;
+        while i < results.len() {
+            let array_ref = 
results.get(i).ok_or(CometError::IndexOutOfBounds(i))?;
+            array_ref
+                .to_data()
+                .move_to_spark(array_addrs[i], schema_addrs[i])?;
 
-        i += 1;
+            i += 1;
+        }
     }
 
     Ok(num_rows as jlong)
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 28a369ead..26cae2f8e 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -139,6 +139,32 @@ class CometExecSuite extends CometTestBase {
     }
   }
 
+  // repro for https://github.com/apache/datafusion-comet/issues/1251
+  test("subquery/exists-subquery/exists-orderby-limit.sql") {
+    withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
+      val table = "src"
+      withTable(table) {
+        sql(s"CREATE TABLE $table (key INT, value STRING) USING PARQUET")
+        sql(s"INSERT INTO $table VALUES(238, 'val_238')")
+
+        // the subquery returns the distinct group by values
+        checkSparkAnswerAndOperator(s"""SELECT * FROM $table
+             |WHERE EXISTS (SELECT MAX(key)
+             |FROM $table
+             |GROUP BY value
+             |LIMIT 1
+             |OFFSET 2)""".stripMargin)
+
+        checkSparkAnswerAndOperator(s"""SELECT * FROM $table
+             |WHERE NOT EXISTS (SELECT MAX(key)
+             |FROM $table
+             |GROUP BY value
+             |LIMIT 1
+             |OFFSET 2)""".stripMargin)
+      }
+    }
+  }
+
   test("Sort on single struct should fallback to Spark") {
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to