This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 995404c fix: Avoid exception caused by broadcasting empty result (#92) 995404c is described below commit 995404cf0f15702135a3cf47fc3ed7de6c54f52f Author: Zhen Wang <643348...@qq.com> AuthorDate: Sat Feb 24 00:37:12 2024 +0800 fix: Avoid exception caused by broadcasting empty result (#92) --- .../comet/execution/shuffle/CometShuffleManager.scala | 2 +- .../scala/org/apache/spark/sql/comet/operators.scala | 6 +++++- .../scala/org/apache/comet/exec/CometExecSuite.scala | 19 +++++++++++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala index 51e6df5..cb34225 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala @@ -52,7 +52,7 @@ class CometShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { " Shuffle will continue to spill to disk when necessary.") } - private val sortShuffleManager = new SortShuffleManager(conf); + private val sortShuffleManager = new SortShuffleManager(conf) /** * A mapping from shuffle ids to the task ids of mappers producing output for those shuffles. diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 07b8d5c..29e0cf2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -82,7 +82,11 @@ abstract class CometExec extends CometPlan { out.flush() out.close() - Iterator((count, cbbos.toChunkedByteBuffer)) + if (out.size() > 0) { + Iterator((count, cbbos.toChunkedByteBuffer)) + } else { + Iterator((count, new ChunkedByteBuffer(Array.empty[ByteBuffer]))) + } } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index d3a1bd2..eb5a8e9 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -80,6 +80,25 @@ class CometExecSuite extends CometTestBase { } } + test("CometBroadcastExchangeExec: empty broadcast") { + withSQLConf(CometConf.COMET_EXEC_BROADCAST_ENABLED.key -> "true") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_b") { + val df = sql( + "SELECT /*+ BROADCAST(a) */ *" + + " FROM (SELECT * FROM tbl_a WHERE _1 < 0) a JOIN tbl_b b" + + " ON a._1 = b._1") + val nativeBroadcast = find(df.queryExecution.executedPlan) { + case _: CometBroadcastExchangeExec => true + case _ => false + }.get.asInstanceOf[CometBroadcastExchangeExec] + val rows = nativeBroadcast.executeCollect() + assert(rows.isEmpty) + } + } + } + } + test("CometExec.executeColumnarCollectIterator can collect ColumnarBatch results") { withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> "true",