alexeykudinkin commented on a change in pull request #4982:
URL: https://github.com/apache/hudi/pull/4982#discussion_r823040893
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
##########
@@ -120,37 +113,45 @@ class RunClusteringProcedure extends BaseProcedure with
ProcedureBuilder with Sp
override def build: Procedure = new RunClusteringProcedure()
- def prunePartition(metaClient: HoodieTableMetaClient, partitionPredicate:
Expression): String = {
- val partitionSchema =
HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None)
-
- // Get tableName meta data
- val engineContext = new HoodieSparkEngineContext(new
JavaSparkContext(sparkSession.sparkContext))
- val properties = new Properties()
-
properties.putAll(JavaConverters.mapAsJavaMapConverter(sparkSession.sessionState.conf.getAllConfs).asJava)
- val metadataConfig =
HoodieMetadataConfig.newBuilder().fromProperties(properties).build()
- val tableMetadata = HoodieTableMetadata.create(engineContext,
metadataConfig, metaClient.getBasePath,
- FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
-
- val sparkParsePartitionUtil =
sparkAdapter.createSparkParsePartitionUtil(sparkSession.sessionState.conf)
- val typedProperties = HoodieCommonUtils.getConfigProperties(sparkSession,
Map.empty)
-
- val partitionColumns =
metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
-
- // Translate all partition path to {@code
org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath}
- val partitionPaths =
tableMetadata.getAllPartitionPaths.asScala.map(partitionPath => {
- val partitionColumnValues = HoodieCommonUtils.parsePartitionColumnValues(
- sparkParsePartitionUtil, typedProperties, metaClient.getBasePath,
- partitionSchema, partitionColumns, partitionPath)
- new PartitionPath(partitionPath, partitionColumnValues)
- })
+ def prunePartition(metaClient: HoodieTableMetaClient, predicate: String):
String = {
+ val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path"
-> metaClient.getBasePath)
+ val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None,
options,
+ FileStatusCache.getOrCreate(sparkSession))
+
+ // Resolve partition predicates, only conjunctive predicates are supported
+ val partitionPredicate = DataSkippingUtils.resolveFilterExpr(sparkSession,
predicate,
Review comment:
Let's create `HoodieCatalystExpressionUtils` and move
`resolveFilterExpr` in there
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
##########
@@ -120,37 +113,45 @@ class RunClusteringProcedure extends BaseProcedure with
ProcedureBuilder with Sp
override def build: Procedure = new RunClusteringProcedure()
- def prunePartition(metaClient: HoodieTableMetaClient, partitionPredicate:
Expression): String = {
- val partitionSchema =
HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None)
-
- // Get tableName meta data
- val engineContext = new HoodieSparkEngineContext(new
JavaSparkContext(sparkSession.sparkContext))
- val properties = new Properties()
-
properties.putAll(JavaConverters.mapAsJavaMapConverter(sparkSession.sessionState.conf.getAllConfs).asJava)
- val metadataConfig =
HoodieMetadataConfig.newBuilder().fromProperties(properties).build()
- val tableMetadata = HoodieTableMetadata.create(engineContext,
metadataConfig, metaClient.getBasePath,
- FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
-
- val sparkParsePartitionUtil =
sparkAdapter.createSparkParsePartitionUtil(sparkSession.sessionState.conf)
- val typedProperties = HoodieCommonUtils.getConfigProperties(sparkSession,
Map.empty)
-
- val partitionColumns =
metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
-
- // Translate all partition path to {@code
org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath}
- val partitionPaths =
tableMetadata.getAllPartitionPaths.asScala.map(partitionPath => {
- val partitionColumnValues = HoodieCommonUtils.parsePartitionColumnValues(
- sparkParsePartitionUtil, typedProperties, metaClient.getBasePath,
- partitionSchema, partitionColumns, partitionPath)
- new PartitionPath(partitionPath, partitionColumnValues)
- })
+ def prunePartition(metaClient: HoodieTableMetaClient, predicate: String):
String = {
+ val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path"
-> metaClient.getBasePath)
+ val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None,
options,
+ FileStatusCache.getOrCreate(sparkSession))
+
+ // Resolve partition predicates, only conjunctive predicates are supported
+ val partitionPredicate = DataSkippingUtils.resolveFilterExpr(sparkSession,
predicate,
+ hoodieFileIndex.partitionSchema)
+ val predicates = splitPredicates(partitionPredicate)
+ checkArgument(predicates._1.isEmpty, "Disjunctive predicate is not
supported")
+
+ // Get all partitions and prune partition by predicates
+ val partitionPaths =
hoodieFileIndex.getAllCachedPartitionPaths.asScala.toSeq
+ val prunedPartitions = hoodieFileIndex.prunePartition(partitionPaths,
predicates._2)
Review comment:
Instead of invoking pruning here directly, let's encapsulate pruning
w/in the Index itself and then expose following API
```
class SparkHoodieTableFileIndex {
def getPartitionPaths(List<Expression> predicates): Seq[PartitionPath] = {
// prune internally
}
}
```
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
##########
@@ -110,16 +140,111 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
*/
def listFileSlices(partitionFilters: Seq[Expression]): Map[String,
Seq[FileSlice]] = {
// Prune the partition path by the partition filters
- val prunedPartitions = HoodieCommonUtils.prunePartition(partitionSchema,
- cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)
+ val prunedPartitions =
prunePartition(cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)
prunedPartitions.map(partition => {
(partition.path, cachedAllInputFileSlices.get(partition).asScala)
}).toMap
}
+ /**
+ * Prune the partition by the filter.This implementation is fork from
+ *
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions.
+ *
+ * @param partitionPaths All the partition paths.
+ * @param predicates The filter condition.
+ * @return The Pruned partition paths.
+ */
+ def prunePartition(partitionPaths: Seq[PartitionPath], predicates:
Seq[Expression]): Seq[PartitionPath] = {
Review comment:
Please check my comment above, and let's make this method private
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
##########
@@ -120,37 +113,45 @@ class RunClusteringProcedure extends BaseProcedure with
ProcedureBuilder with Sp
override def build: Procedure = new RunClusteringProcedure()
- def prunePartition(metaClient: HoodieTableMetaClient, partitionPredicate:
Expression): String = {
- val partitionSchema =
HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None)
-
- // Get tableName meta data
- val engineContext = new HoodieSparkEngineContext(new
JavaSparkContext(sparkSession.sparkContext))
- val properties = new Properties()
-
properties.putAll(JavaConverters.mapAsJavaMapConverter(sparkSession.sessionState.conf.getAllConfs).asJava)
- val metadataConfig =
HoodieMetadataConfig.newBuilder().fromProperties(properties).build()
- val tableMetadata = HoodieTableMetadata.create(engineContext,
metadataConfig, metaClient.getBasePath,
- FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
-
- val sparkParsePartitionUtil =
sparkAdapter.createSparkParsePartitionUtil(sparkSession.sessionState.conf)
- val typedProperties = HoodieCommonUtils.getConfigProperties(sparkSession,
Map.empty)
-
- val partitionColumns =
metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
-
- // Translate all partition path to {@code
org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath}
- val partitionPaths =
tableMetadata.getAllPartitionPaths.asScala.map(partitionPath => {
- val partitionColumnValues = HoodieCommonUtils.parsePartitionColumnValues(
- sparkParsePartitionUtil, typedProperties, metaClient.getBasePath,
- partitionSchema, partitionColumns, partitionPath)
- new PartitionPath(partitionPath, partitionColumnValues)
- })
+ def prunePartition(metaClient: HoodieTableMetaClient, predicate: String):
String = {
+ val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path"
-> metaClient.getBasePath)
+ val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None,
options,
+ FileStatusCache.getOrCreate(sparkSession))
+
+ // Resolve partition predicates, only conjunctive predicates are supported
+ val partitionPredicate = DataSkippingUtils.resolveFilterExpr(sparkSession,
predicate,
+ hoodieFileIndex.partitionSchema)
+ val predicates = splitPredicates(partitionPredicate)
Review comment:
And there's actually util doing what you need
`splitPartitionAndDataPredicates`
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
##########
@@ -120,37 +113,45 @@ class RunClusteringProcedure extends BaseProcedure with
ProcedureBuilder with Sp
override def build: Procedure = new RunClusteringProcedure()
- def prunePartition(metaClient: HoodieTableMetaClient, partitionPredicate:
Expression): String = {
- val partitionSchema =
HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None)
-
- // Get tableName meta data
- val engineContext = new HoodieSparkEngineContext(new
JavaSparkContext(sparkSession.sparkContext))
- val properties = new Properties()
-
properties.putAll(JavaConverters.mapAsJavaMapConverter(sparkSession.sessionState.conf.getAllConfs).asJava)
- val metadataConfig =
HoodieMetadataConfig.newBuilder().fromProperties(properties).build()
- val tableMetadata = HoodieTableMetadata.create(engineContext,
metadataConfig, metaClient.getBasePath,
- FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
-
- val sparkParsePartitionUtil =
sparkAdapter.createSparkParsePartitionUtil(sparkSession.sessionState.conf)
- val typedProperties = HoodieCommonUtils.getConfigProperties(sparkSession,
Map.empty)
-
- val partitionColumns =
metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
-
- // Translate all partition path to {@code
org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath}
- val partitionPaths =
tableMetadata.getAllPartitionPaths.asScala.map(partitionPath => {
- val partitionColumnValues = HoodieCommonUtils.parsePartitionColumnValues(
- sparkParsePartitionUtil, typedProperties, metaClient.getBasePath,
- partitionSchema, partitionColumns, partitionPath)
- new PartitionPath(partitionPath, partitionColumnValues)
- })
+ def prunePartition(metaClient: HoodieTableMetaClient, predicate: String):
String = {
+ val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path"
-> metaClient.getBasePath)
+ val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None,
options,
+ FileStatusCache.getOrCreate(sparkSession))
+
+ // Resolve partition predicates, only conjunctive predicates are supported
+ val partitionPredicate = DataSkippingUtils.resolveFilterExpr(sparkSession,
predicate,
+ hoodieFileIndex.partitionSchema)
+ val predicates = splitPredicates(partitionPredicate)
Review comment:
I see that we're splitting but we don't separate b/w Data filters and
Partition filters
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
##########
@@ -120,37 +113,45 @@ class RunClusteringProcedure extends BaseProcedure with
ProcedureBuilder with Sp
override def build: Procedure = new RunClusteringProcedure()
- def prunePartition(metaClient: HoodieTableMetaClient, partitionPredicate:
Expression): String = {
- val partitionSchema =
HoodieCommonUtils.getPartitionSchemaFromProperty(metaClient, None)
-
- // Get tableName meta data
- val engineContext = new HoodieSparkEngineContext(new
JavaSparkContext(sparkSession.sparkContext))
- val properties = new Properties()
-
properties.putAll(JavaConverters.mapAsJavaMapConverter(sparkSession.sessionState.conf.getAllConfs).asJava)
- val metadataConfig =
HoodieMetadataConfig.newBuilder().fromProperties(properties).build()
- val tableMetadata = HoodieTableMetadata.create(engineContext,
metadataConfig, metaClient.getBasePath,
- FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
-
- val sparkParsePartitionUtil =
sparkAdapter.createSparkParsePartitionUtil(sparkSession.sessionState.conf)
- val typedProperties = HoodieCommonUtils.getConfigProperties(sparkSession,
Map.empty)
-
- val partitionColumns =
metaClient.getTableConfig.getPartitionFields.orElse(Array[String]())
-
- // Translate all partition path to {@code
org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath}
- val partitionPaths =
tableMetadata.getAllPartitionPaths.asScala.map(partitionPath => {
- val partitionColumnValues = HoodieCommonUtils.parsePartitionColumnValues(
- sparkParsePartitionUtil, typedProperties, metaClient.getBasePath,
- partitionSchema, partitionColumns, partitionPath)
- new PartitionPath(partitionPath, partitionColumnValues)
- })
+ def prunePartition(metaClient: HoodieTableMetaClient, predicate: String):
String = {
+ val options = Map(QUERY_TYPE.key() -> QUERY_TYPE_SNAPSHOT_OPT_VAL, "path"
-> metaClient.getBasePath)
+ val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, None,
options,
+ FileStatusCache.getOrCreate(sparkSession))
+
+ // Resolve partition predicates, only conjunctive predicates are supported
+ val partitionPredicate = DataSkippingUtils.resolveFilterExpr(sparkSession,
predicate,
+ hoodieFileIndex.partitionSchema)
+ val predicates = splitPredicates(partitionPredicate)
+ checkArgument(predicates._1.isEmpty, "Disjunctive predicate is not
supported")
+
+ // Get all partitions and prune partition by predicates
+ val partitionPaths =
hoodieFileIndex.getAllCachedPartitionPaths.asScala.toSeq
+ val prunedPartitions = hoodieFileIndex.prunePartition(partitionPaths,
predicates._2)
+ prunedPartitions.map(partitionPath =>
partitionPath.getPath).toSet.mkString(",")
+ }
- // Filter partition by predicates
- val selectedPartitions = HoodieCommonUtils.prunePartition(
- partitionSchema, partitionPaths, partitionPredicate)
- selectedPartitions.map(partitionPath =>
partitionPath.getPath).toSet.mkString(",")
+ /**
+ * Split predicate to disjunctive predicates and conjunctive predicates
+ *
+ * @param condition Predicate to be split
+ * @return The pair of disjunctive predicates and conjunctive predicates
+ */
+ private def splitPredicates(condition: Expression): (Seq[Expression],
Seq[Expression]) = {
+ condition match {
Review comment:
Please check my comment above regarding splitting
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
##########
@@ -110,16 +140,111 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
*/
def listFileSlices(partitionFilters: Seq[Expression]): Map[String,
Seq[FileSlice]] = {
// Prune the partition path by the partition filters
- val prunedPartitions = HoodieCommonUtils.prunePartition(partitionSchema,
- cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)
+ val prunedPartitions =
prunePartition(cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)
prunedPartitions.map(partition => {
(partition.path, cachedAllInputFileSlices.get(partition).asScala)
}).toMap
}
+ /**
+ * Prune the partition by the filter.This implementation is fork from
+ *
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions.
+ *
+ * @param partitionPaths All the partition paths.
+ * @param predicates The filter condition.
+ * @return The Pruned partition paths.
+ */
+ def prunePartition(partitionPaths: Seq[PartitionPath], predicates:
Seq[Expression]): Seq[PartitionPath] = {
+ val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
+ val partitionPruningPredicates = predicates.filter {
+ _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+ }
+ if (partitionPruningPredicates.nonEmpty) {
+ val predicate = partitionPruningPredicates.reduce(expressions.And)
+
+ val boundPredicate = InterpretedPredicate(predicate.transform {
+ case a: AttributeReference =>
+ val index = partitionSchema.indexWhere(a.name == _.name)
+ BoundReference(index, partitionSchema(index).dataType, nullable =
true)
+ })
+
+ val prunedPartitionPaths = partitionPaths.filter {
+ partitionPath =>
boundPredicate.eval(InternalRow.fromSeq(partitionPath.values))
+ }
+
+ logInfo(s"Total partition size is: ${partitionPaths.size}," +
+ s" after partition prune size is: ${prunedPartitionPaths.size}")
+ prunedPartitionPaths
+ } else {
+ partitionPaths
+ }
+ }
+
protected def parsePartitionColumnValues(partitionColumns: Array[String],
partitionPath: String): Array[Object] = {
- HoodieCommonUtils.parsePartitionColumnValues(sparkParsePartitionUtil,
configProperties,
- basePath, partitionSchema, partitionColumns, partitionPath)
+ if (partitionColumns.length == 0) {
Review comment:
You just moving this code back, right? There's no additional changes,
are there?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]