This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 995404c  fix: Avoid exception caused by broadcasting empty result (#92)
995404c is described below

commit 995404cf0f15702135a3cf47fc3ed7de6c54f52f
Author: Zhen Wang <643348...@qq.com>
AuthorDate: Sat Feb 24 00:37:12 2024 +0800

    fix: Avoid exception caused by broadcasting empty result (#92)
---
 .../comet/execution/shuffle/CometShuffleManager.scala |  2 +-
 .../scala/org/apache/spark/sql/comet/operators.scala  |  6 +++++-
 .../scala/org/apache/comet/exec/CometExecSuite.scala  | 19 +++++++++++++++++++
 3 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
index 51e6df5..cb34225 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
@@ -52,7 +52,7 @@ class CometShuffleManager(conf: SparkConf) extends 
ShuffleManager with Logging {
         " Shuffle will continue to spill to disk when necessary.")
   }
 
-  private val sortShuffleManager = new SortShuffleManager(conf);
+  private val sortShuffleManager = new SortShuffleManager(conf)
 
   /**
    * A mapping from shuffle ids to the task ids of mappers producing output 
for those shuffles.
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 07b8d5c..29e0cf2 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -82,7 +82,11 @@ abstract class CometExec extends CometPlan {
 
       out.flush()
       out.close()
-      Iterator((count, cbbos.toChunkedByteBuffer))
+      if (out.size() > 0) {
+        Iterator((count, cbbos.toChunkedByteBuffer))
+      } else {
+        Iterator((count, new ChunkedByteBuffer(Array.empty[ByteBuffer])))
+      }
     }
   }
 
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index d3a1bd2..eb5a8e9 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -80,6 +80,25 @@ class CometExecSuite extends CometTestBase {
     }
   }
 
+  test("CometBroadcastExchangeExec: empty broadcast") {
+    withSQLConf(CometConf.COMET_EXEC_BROADCAST_ENABLED.key -> "true") {
+      withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") {
+        withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_b") {
+          val df = sql(
+            "SELECT /*+ BROADCAST(a) */ *" +
+              " FROM (SELECT * FROM tbl_a WHERE _1 < 0) a JOIN tbl_b b" +
+              " ON a._1 = b._1")
+          val nativeBroadcast = find(df.queryExecution.executedPlan) {
+            case _: CometBroadcastExchangeExec => true
+            case _ => false
+          }.get.asInstanceOf[CometBroadcastExchangeExec]
+          val rows = nativeBroadcast.executeCollect()
+          assert(rows.isEmpty)
+        }
+      }
+    }
+  }
+
   test("CometExec.executeColumnarCollectIterator can collect ColumnarBatch 
results") {
     withSQLConf(
       CometConf.COMET_EXEC_ENABLED.key -> "true",

Reply via email to