This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f350b760ea11 [SPARK-47270][SQL] Dataset.isEmpty projects
CommandResults locally
f350b760ea11 is described below
commit f350b760ea117a2d670720e37ba7be3fd8c0de0a
Author: Zhen Wang <[email protected]>
AuthorDate: Mon Mar 11 19:33:43 2024 +0800
[SPARK-47270][SQL] Dataset.isEmpty projects CommandResults locally
### What changes were proposed in this pull request?
Similar to #40779, `Dataset.isEmpty` should also not trigger job execution
on CommandResults.
This PR converts `CommandResult` to `LocalRelation` in `Dataset.isEmpty`
method.
### Why are the changes needed?
A simple `spark.sql("show tables").isEmpty` shouldn not require an executor.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added new UT.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #45373 from wForget/SPARK-47270.
Authored-by: Zhen Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../main/scala/org/apache/spark/sql/Dataset.scala | 22 ++++++++++++++--------
.../scala/org/apache/spark/sql/DatasetSuite.scala | 19 +++++++++++++++++++
2 files changed, 33 insertions(+), 8 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index f0c9f7ae53fc..c2e51b574df7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -271,13 +271,7 @@ class Dataset[T] private[sql](
private[sql] def getRows(
numRows: Int,
truncate: Int): Seq[Seq[String]] = {
- val newDf = logicalPlan match {
- case c: CommandResult =>
- // Convert to `LocalRelation` and let `ConvertToLocalRelation` do the
casting locally to
- // avoid triggering a job
- Dataset.ofRows(sparkSession, LocalRelation(c.output, c.rows))
- case _ => toDF()
- }
+ val newDf = commandResultOptimized.toDF()
val castCols = newDf.logicalPlan.output.map { col =>
Column(ToPrettyString(col))
}
@@ -655,7 +649,8 @@ class Dataset[T] private[sql](
* @group basic
* @since 2.4.0
*/
- def isEmpty: Boolean = withAction("isEmpty",
select().limit(1).queryExecution) { plan =>
+ def isEmpty: Boolean = withAction("isEmpty",
+ commandResultOptimized.select().limit(1).queryExecution) { plan =>
plan.executeTake(1).isEmpty
}
@@ -4471,6 +4466,17 @@ class Dataset[T] private[sql](
}
}
+ /** Returns a optimized plan for CommandResult, convert to `LocalRelation`.
*/
+ private def commandResultOptimized: Dataset[T] = {
+ logicalPlan match {
+ case c: CommandResult =>
+ // Convert to `LocalRelation` and let `ConvertToLocalRelation` do the
casting locally to
+ // avoid triggering a job
+ Dataset(sparkSession, LocalRelation(c.output, c.rows))
+ case _ => this
+ }
+ }
+
/** Convert to an RDD of serialized ArrowRecordBatches. */
private[sql] def toArrowBatchRdd(plan: SparkPlan): RDD[Array[Byte]] = {
val schemaCaptured = this.schema
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index bb18769a23b4..91057dcc98e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -2719,6 +2719,25 @@ class DatasetSuite extends QueryTest
checkDataset(ds.map(t => t),
WithSet(0, HashSet("foo", "bar")), WithSet(1, HashSet("bar", "zoo")))
}
+
+ test("SPARK-47270: isEmpty does not trigger job execution on
CommandResults") {
+ withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> "") {
+ withTable("t1") {
+ sql("create table t1(c int) using parquet")
+
+ @volatile var jobCounter = 0
+ val listener = new SparkListener {
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ jobCounter += 1
+ }
+ }
+ withListener(spark.sparkContext, listener) { _ =>
+ sql("show tables").isEmpty
+ }
+ assert(jobCounter === 0)
+ }
+ }
+ }
}
class DatasetLargeResultCollectingSuite extends QueryTest
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]