huberylee commented on a change in pull request #4982:
URL: https://github.com/apache/hudi/pull/4982#discussion_r823276254



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
##########
@@ -120,37 +113,45 @@ class RunClusteringProcedure extends BaseProcedure with 
ProcedureBuilder with Sp
 
   override def build: Procedure = new RunClusteringProcedure()
 
-  def prunePartition(metaClient: HoodieTableMetaClient, partitionPredicate: 
Expression): String = {
-    val partitionSchema = 
HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None)
-
-    // Get tableName meta data
-    val engineContext = new HoodieSparkEngineContext(new 
JavaSparkContext(sparkSession.sparkContext))
-    val properties = new Properties()
-    
properties.putAll(JavaConverters.mapAsJavaMapConverter(sparkSession.sessionState.conf.getAllConfs).asJava)
-    val metadataConfig = 
HoodieMetadataConfig.newBuilder().fromProperties(properties).build()
-    val tableMetadata = HoodieTableMetadata.create(engineContext, 
metadataConfig, metaClient.getBasePath,
-      FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
-
-    val sparkParsePartitionUtil = 
sparkAdapter.createSparkParsePartitionUtil(sparkSession.sessionState.conf)
-    val typedProperties = HoodieCommonUtils.getConfigProperties(sparkSession, 
Map.empty)
-
-    val partitionColumns = 
metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
-
-    // Translate all partition path to {@code 
org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath}
-    val partitionPaths = 
tableMetadata.getAllPartitionPaths.asScala.map(partitionPath => {
-      val partitionColumnValues = HoodieCommonUtils.parsePartitionColumnValues(
-        sparkParsePartitionUtil, typedProperties, metaClient.getBasePath,
-        partitionSchema, partitionColumns, partitionPath)
-      new PartitionPath(partitionPath, partitionColumnValues)
-    })
+  def prunePartition(metaClient: HoodieTableMetaClient, predicate: String): 
String = {
+    val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path" 
-> metaClient.getBasePath)
+    val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None, 
options,
+      FileStatusCache.getOrCreate(sparkSession))
+
+    // Resolve partition predicates, only conjunctive predicates are supported
+    val partitionPredicate = DataSkippingUtils.resolveFilterExpr(sparkSession, 
predicate,

Review comment:
       Done

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
##########
@@ -120,37 +113,45 @@ class RunClusteringProcedure extends BaseProcedure with 
ProcedureBuilder with Sp
 
   override def build: Procedure = new RunClusteringProcedure()
 
-  def prunePartition(metaClient: HoodieTableMetaClient, partitionPredicate: 
Expression): String = {
-    val partitionSchema = 
HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None)
-
-    // Get tableName meta data
-    val engineContext = new HoodieSparkEngineContext(new 
JavaSparkContext(sparkSession.sparkContext))
-    val properties = new Properties()
-    
properties.putAll(JavaConverters.mapAsJavaMapConverter(sparkSession.sessionState.conf.getAllConfs).asJava)
-    val metadataConfig = 
HoodieMetadataConfig.newBuilder().fromProperties(properties).build()
-    val tableMetadata = HoodieTableMetadata.create(engineContext, 
metadataConfig, metaClient.getBasePath,
-      FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
-
-    val sparkParsePartitionUtil = 
sparkAdapter.createSparkParsePartitionUtil(sparkSession.sessionState.conf)
-    val typedProperties = HoodieCommonUtils.getConfigProperties(sparkSession, 
Map.empty)
-
-    val partitionColumns = 
metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
-
-    // Translate all partition path to {@code 
org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath}
-    val partitionPaths = 
tableMetadata.getAllPartitionPaths.asScala.map(partitionPath => {
-      val partitionColumnValues = HoodieCommonUtils.parsePartitionColumnValues(
-        sparkParsePartitionUtil, typedProperties, metaClient.getBasePath,
-        partitionSchema, partitionColumns, partitionPath)
-      new PartitionPath(partitionPath, partitionColumnValues)
-    })
+  def prunePartition(metaClient: HoodieTableMetaClient, predicate: String): 
String = {
+    val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path" 
-> metaClient.getBasePath)
+    val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None, 
options,
+      FileStatusCache.getOrCreate(sparkSession))
+
+    // Resolve partition predicates, only conjunctive predicates are supported
+    val partitionPredicate = DataSkippingUtils.resolveFilterExpr(sparkSession, 
predicate,
+      hoodieFileIndex.partitionSchema)
+    val predicates = splitPredicates(partitionPredicate)
+    checkArgument(predicates._1.isEmpty, "Disjunctive predicate is not 
supported")
+
+    // Get all partitions and prune partition by predicates
+    val partitionPaths = 
hoodieFileIndex.getAllCachedPartitionPaths.asScala.toSeq
+    val prunedPartitions = hoodieFileIndex.prunePartition(partitionPaths, 
predicates._2)

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to