huberylee commented on a change in pull request #4982:
URL: https://github.com/apache/hudi/pull/4982#discussion_r823278371



##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
##########
@@ -110,16 +140,111 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
    */
   def listFileSlices(partitionFilters: Seq[Expression]): Map[String, 
Seq[FileSlice]] = {
     // Prune the partition path by the partition filters
-    val prunedPartitions = HoodieCommonUtils.prunePartition(partitionSchema,
-      cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)
+    val prunedPartitions = 
prunePartition(cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)
     prunedPartitions.map(partition => {
       (partition.path, cachedAllInputFileSlices.get(partition).asScala)
     }).toMap
   }
 
+  /**
+   * Prune the partition by the filter.This implementation is fork from
+   * 
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions.
+   *
+   * @param partitionPaths All the partition paths.
+   * @param predicates     The filter condition.
+   * @return The Pruned partition paths.
+   */
+  def prunePartition(partitionPaths: Seq[PartitionPath], predicates: 
Seq[Expression]): Seq[PartitionPath] = {
+    val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
+    val partitionPruningPredicates = predicates.filter {
+      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+    }
+    if (partitionPruningPredicates.nonEmpty) {
+      val predicate = partitionPruningPredicates.reduce(expressions.And)
+
+      val boundPredicate = InterpretedPredicate(predicate.transform {
+        case a: AttributeReference =>
+          val index = partitionSchema.indexWhere(a.name == _.name)
+          BoundReference(index, partitionSchema(index).dataType, nullable = 
true)
+      })
+
+      val prunedPartitionPaths = partitionPaths.filter {
+        partitionPath => 
boundPredicate.eval(InternalRow.fromSeq(partitionPath.values))
+      }
+
+      logInfo(s"Total partition size is: ${partitionPaths.size}," +
+        s" after partition prune size is: ${prunedPartitionPaths.size}")
+      prunedPartitionPaths
+    } else {
+      partitionPaths
+    }
+  }
+
   protected def parsePartitionColumnValues(partitionColumns: Array[String], 
partitionPath: String): Array[Object] = {
-    HoodieCommonUtils.parsePartitionColumnValues(sparkParsePartitionUtil, 
configProperties,
-      basePath, partitionSchema, partitionColumns, partitionPath)
+    if (partitionColumns.length == 0) {

Review comment:
       Yes, I just moving this code back with no additional changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to