yui2010 commented on a change in pull request #2378:
URL: https://github.com/apache/hudi/pull/2378#discussion_r552497329
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -108,7 +111,7 @@ class MergeOnReadSnapshotRelation(val sqlContext:
SQLContext,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
Review comment:
Hi @garyli1019 sorry for reply late.
I have not understand completely “ if we need to support the partition
pruning, we will need to add partition information to the partitionedFile”
which you mentioned
I think we can support the partition pruning in buildFileIndex without
additional info if i misunderstand your description
Current implements for support the partition pruning have two main logic as
follow:
1. `val inMemoryFileIndex =
HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)`
InMemoryFileIndex#refresh0 will list all the LeafFiles. example as follow:
```
/hudi/data/order/dt=20200610/a.parquet
/hudi/data/order/dt=20200807/b.parquet
/hudi/data/order/dt=20200808/c.parquet
/hudi/data/order/dt=20200808/c_1.log
/hudi/data/order/dt=20200808/d.parquet
/hudi/data/order/dt=20200809/e.parquet
```
2. `inMemoryFileIndex.listFiles(filters, filters)`
`PartitioningAwareFileIndex#inferPartitioning() `will infer partition
Columns and paths info. example as follow:
```
PartitionSpec(
partitionColumns = StructType(
StructField(name = “dt”, dataType = StringType, nullable = true),
partitions = Seq(
Partition(values = Row(“20200610”), path =
"hdfs://hudi/data/order/dt=20200610"),
Partition(values = Row(“20200807”), path =
"hdfs://hudi/data/order/dt=20200807"),
Partition(values = Row(“20200808”), path =
"hdfs://hudi/data/order/dt=20200808"),
Partition(values = Row(“20200809”), path =
"hdfs://hudi/data/order/dt=20200809"))
```
`PartitioningAwareFileIndex#prunePartitions` will filter the unneeded path
thought partitionFilters parameter
If our sql is “select rkey,date from test_order where dt == '20200808' “ and
we will get Partition(values = Row(“20200808”), path =
"hdfs://hudi/data/order/dt=20200808")
And MergeOnReadSnapshotRelation#buildFileIndex will return two
HoodieMergeOnReadFileSplit(c.parquet and d.parquet)
Some test case log as follow:
```
12657 [main] INFO
org.apache.spark.sql.execution.datasources.InMemoryFileIndex - Selected 1
partitions out of 4, pruned 75.0% partitions.
12711 [main] INFO
org.apache.hudi.common.table.view.HoodieTableFileSystemView - Adding
file-groups for partition :dt=20200808, #FileGroups=2
73169 [main] INFO org.apache.spark.scheduler.DAGScheduler - Job 1
finished: show at HudiAnalyticSql.java:281, took 24.457325 s
+----+-------------------+
|rkey|date |
+----+-------------------+
|4 |2020-08-08 09:58:48|
|20 |2020-08-08 16:55:59|
|21 |2020-08-08 17:53:59|
+----+-------------------+
```
So we can support the partition pruning in buildFileIndex without
additional info
And you mentioned “pass the partitionSchema into the parquet reader” do you
mean the query result will include partition columns ?
I agree with you on “PrunedFilteredScan and CatalystScan only support the
filter pushdown and column pruning”
About implements in spark 2.x datasource v1 and v2 (not supported)
We have two options to do this
1.in order to reuse `PartitioningAwareFileIndex#listFiles` we will convert
filter:Array[Filter] to filter:Seq[Expression]
2.implements our listFiles(filter:Array[Filter]).
About implements in spark 3.x (supported
https://issues.apache.org/jira/browse/SPARK-30428)
We can implements MergeOnReadSnapshotScan like CSVScan
the current implementation is a simplified way(it has been deployed to our
test cluster), and shall we need to discuss more detail?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]