This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 935aa8c  [SPARK-32985][SQL][FOLLOWUP] Rename createNonBucketedReadRDD 
and minor change in FileSourceScanExec
935aa8c is described below

commit 935aa8c8db6824648483f26c2889c33030985259
Author: Cheng Su <chen...@fb.com>
AuthorDate: Tue Mar 30 19:57:32 2021 +0900

    [SPARK-32985][SQL][FOLLOWUP] Rename createNonBucketedReadRDD and minor 
change in FileSourceScanExec
    
    ### What changes were proposed in this pull request?
    
    This PR is a followup change to address comments in 
https://github.com/apache/spark/pull/31413#discussion_r603280965 and 
https://github.com/apache/spark/pull/31413#discussion_r603296475 . Minor change 
in `FileSourceScanExec`. No actual logic change here.
    
    ### Why are the changes needed?
    
    Better readability.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing unit tests.
    
    Closes #32000 from c21/bucket-scan.
    
    Authored-by: Cheng Su <chen...@fb.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../apache/spark/sql/execution/DataSourceScanExec.scala  | 16 +++++-----------
 1 file changed, 5 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 38e63d4..6fa4167 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -410,7 +410,7 @@ case class FileSourceScanExec(
       createBucketedReadRDD(relation.bucketSpec.get, readFile, 
dynamicallySelectedPartitions,
         relation)
     } else {
-      createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, 
relation)
+      createReadRDD(readFile, dynamicallySelectedPartitions, relation)
     }
     sendDriverMetrics()
     readRDD
@@ -518,7 +518,7 @@ case class FileSourceScanExec(
 
   /**
    * Create an RDD for bucketed reads.
-   * The non-bucketed variant of this function is [[createNonBucketedReadRDD]].
+   * The non-bucketed variant of this function is [[createReadRDD]].
    *
    * The algorithm is pretty simple: each RDD partition being returned should 
include all the files
    * with the same bucket id from all the given Hive partitions.
@@ -580,7 +580,7 @@ case class FileSourceScanExec(
    * @param selectedPartitions Hive-style partition that are part of the read.
    * @param fsRelation [[HadoopFsRelation]] associated with the read.
    */
-  private def createNonBucketedReadRDD(
+  private def createReadRDD(
       readFile: (PartitionedFile) => Iterator[InternalRow],
       selectedPartitions: Array[PartitionDirectory],
       fsRelation: HadoopFsRelation): RDD[InternalRow] = {
@@ -594,14 +594,8 @@ case class FileSourceScanExec(
     val bucketingEnabled = 
fsRelation.sparkSession.sessionState.conf.bucketingEnabled
     val shouldProcess: Path => Boolean = optionalBucketSet match {
       case Some(bucketSet) if bucketingEnabled =>
-        filePath => {
-          BucketingUtils.getBucketId(filePath.getName) match {
-            case Some(id) => bucketSet.get(id)
-            case None =>
-              // Do not prune the file if bucket file name is invalid
-              true
-          }
-        }
+        // Do not prune the file if bucket file name is invalid
+        filePath => 
BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
       case _ =>
         _ => true
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to