This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5199d2f  [SPARK-30780][SQL] Empty LocalTableScan should use RDD 
without partitions
5199d2f is described below

commit 5199d2f9dcf044f759318457ce3c0a56e00d9537
Author: herman <[email protected]>
AuthorDate: Wed Feb 12 10:48:29 2020 +0900

    [SPARK-30780][SQL] Empty LocalTableScan should use RDD without partitions
    
    ### What changes were proposed in this pull request?
    This is a small follow-up for https://github.com/apache/spark/pull/27400. 
This PR makes an empty `LocalTableScanExec` return an `RDD` without partitions.
    
    ### Why are the changes needed?
    It is a bit unexpected that the RDD contains partitions if there is not 
work to do. It also can save a bit of work when this is used in a more complex 
plan.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Added test to `SparkPlanSuite`.
    
    Closes #27530 from hvanhovell/SPARK-30780.
    
    Authored-by: herman <[email protected]>
    Signed-off-by: HyukjinKwon <[email protected]>
    (cherry picked from commit b25359cca3190f6a34dce3c3e49c4d2a80e88bdc)
    Signed-off-by: HyukjinKwon <[email protected]>
---
 .../org/apache/spark/sql/execution/LocalTableScanExec.scala  | 12 ++++++++----
 .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala |  2 +-
 .../org/apache/spark/sql/execution/SparkPlanSuite.scala      |  4 ++++
 3 files changed, 13 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
index 1b5115f..b452213 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -45,10 +45,14 @@ case class LocalTableScanExec(
     }
   }
 
-  private lazy val numParallelism: Int = math.min(math.max(unsafeRows.length, 
1),
-    sqlContext.sparkContext.defaultParallelism)
-
-  private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows, 
numParallelism)
+  @transient private lazy val rdd: RDD[InternalRow] = {
+    if (rows.isEmpty) {
+      sqlContext.sparkContext.emptyRDD
+    } else {
+      val numSlices = math.min(unsafeRows.length, 
sqlContext.sparkContext.defaultParallelism)
+      sqlContext.sparkContext.parallelize(unsafeRows, numSlices)
+    }
+  }
 
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index d2d58a8..694e576 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -330,7 +330,7 @@ class DataFrameSuite extends QueryTest
       testData.select("key").coalesce(1).select("key"),
       testData.select("key").collect().toSeq)
 
-    assert(spark.emptyDataFrame.coalesce(1).rdd.partitions.size === 1)
+    assert(spark.emptyDataFrame.coalesce(1).rdd.partitions.size === 0)
   }
 
   test("convert $\"attribute name\" into unresolved attribute") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index e3bc414..56fff11 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -84,4 +84,8 @@ class SparkPlanSuite extends QueryTest with 
SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-30780 empty LocalTableScan should use RDD without partitions") {
+    assert(LocalTableScanExec(Nil, Nil).execute().getNumPartitions == 0)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to