yui2010 commented on a change in pull request #2378:
URL: https://github.com/apache/hudi/pull/2378#discussion_r589526502
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
##########
@@ -504,6 +506,42 @@ class TestMORDataSource extends HoodieClientTestBase {
hudiSnapshotDF2.show(1)
}
+ @Test
+ def testPrunePartitions() {
+ // First Operation:
+ // Producing parquet files to three hive style partitions like
/partition=20150316/.
+ // SNAPSHOT view on MOR table with parquet files only.
+ dataGen.setPartitionPaths(Array("20150316","20150317","20160315"));
+ val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
Review comment:
Hi, @garyli1019 and @nsivabalan
yes, In fact, the case about the partition column schema have not in data
schema does exist and i write
testcase(testPrunePartitionsWithAppendedPartitionsSchema) to cover this.
// we have three partition values: "2015/03/16","2015/03/17","2016/03/15",
path view as follow:
// path
// └── to
// └── table
// ├── year=2015
// │ ├── month=03
// ├── day=16
// │ │ └── data.parquet
// ├── day=17
// │ │ └── data.parquet
// ├── year=2016
// │ ├── month=03
// ├── day=15
// │ │ └── data.parquet
and i have a problem with **the misplacement of the returned data**.
val sql = "select month, seconds_since_epoch, year from
mor_test_partition_table where year < '2016' and month > '01'" (select
seconds_since_epoch, year, month is ok )
spark.sql(sql).show(1)
it returned incorrect result as follow:
+----------+-------------------+----+
| month |seconds_since_epoch|year|
+----------+-------------------+----+
|1556774336| 2015 | 3 |
+----------+-------------------+----+
when we use spark.read.format("parquet")
val sql = "select month, seconds_since_epoch, year from
mor_test_partition_table where year < '2016' and month > '01'"
spark.sql(sql).show(1)
it returned the correct result as follow:
+-----+------------------------+------+
|month| seconds_since_epoch | year |
+-----+------------------------+------+
| 3 |7574769756561934803| 2015 |
+-----+------------------------+------+
and I investigated the spark code and found that the cause of this incorrect
result was due to the different of RowDataSourceScanExec(datasource using eg.
format("org.apache.hudi")) and FileSourceScanExec(spark native using eg.
format("parquet"))
1. when we using spark query parquet, ParquetRecordReader will return the
internalRow value with data schema column(not contains partition schema column)
value and append partition Values
[VectorizedParquetRecordReader](https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L208)
```
if (partitionColumns != null) {
int partitionIdx = sparkSchema.fields().length;
for (int i = 0; i < partitionColumns.fields().length; i++) {
ColumnVectorUtils.populate(columnVectors[i + partitionIdx],
partitionValues, i);
columnVectors[i + partitionIdx].setIsConstant();
}
}
```
2. RowDataSourceScanExec and FileSourceScanExec have the output:
Seq[Attribute]
there is a default correspondence rule: the order of columnVectors
corresponds to the order of output attributes
**[FileSourceScanExec](https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L199)
using readDataColumns ++ partitionColumns order**
```
val readDataColumns =
dataColumns
.filter(requiredAttributes.contains)
.filterNot(partitionColumns.contains)
val outputSchema = readDataColumns.toStructType
logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
val outputAttributes = readDataColumns ++ partitionColumns
val scan =
FileSourceScanExec(
fsRelation,
outputAttributes,
outputSchema,
partitionKeyFilters.toSeq,
bucketSet,
dataFilters,
table.map(_.identifier))
```
such as: columnVectors[0] ~ output(0),columnVectors[1] ~ output(1),...
**[RowDataSourceScanExec](https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L78)
using requiredColumns order**
```
_/** Physical plan node for scanning data from a relation. */
case class RowDataSourceScanExec(
fullOutput: Seq[Attribute],
requiredColumnsIndex: Seq[Int],
filters: Set[Filter],
handledFilters: Set[Filter],
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec {
def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput)
```
3. BufferedRowIterator will rebuild internalRow value order from
[projectList](https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala#L135)'s
index of output
`val exprs = projectList.map(x =>
BindReferences.bindReference[Expression](x, child.output))`
so FileSourceScanExec using **readDataColumns ++ partitionColumns order**
return corrected result
```
project_month ~ output(2) ~ columnVectors[2]
project_seconds_since_epoch ~ output(0) ~ columnVectors[0]
project_year ~ output(1) ~ columnVectors[1]
+-----+------------------------+------+
|month| seconds_since_epoch | year |
+-----+------------------------+------+
| 3 |7574769756561934803| 2015 |
+-----+------------------------+------+
```
RowDataSourceScanExec using **requiredColumns order** return incorrect
result
```
project_month ~ output(0) ~ columnVectors[0] ~ seconds_since_epoch value
project_seconds_since_epoch ~ output(1) ~ columnVectors[1] ~ year value
project_year ~ output(2) ~ columnVectors[2] ~ month value
+----------+-------------------+----+
| month |seconds_since_epoch|year|
+----------+-------------------+----+
|1556774336| 2015 | 3 |
+----------+-------------------+----+
```
so it's very difficult to use spark2.x data source V1 code to achieve this
function (have not api override RowDataSourceScanExec's output attribute) if i
understand correctly
and i researched datasource v2 some information as follow:
1. datasource v2 api spark 2.x
[https://issues.apache.org/jira/browse/SPARK-15689](https://issues.apache.org/jira/browse/SPARK-15689)
we can implement HoodieFileDataSourceReader#readSchema method
and after
[pruneColumns](https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala#L108)
we can get **readDataColumns ++ partitionColumns order**
2. datasource v2 api spark 3.x
[https://issues.apache.org/jira/browse/SPARK-22386](https://issues.apache.org/jira/browse/SPARK-22386)
implementation will may be easier because there is already some native case
[https://issues.apache.org/jira/browse/SPARK-30428](https://issues.apache.org/jira/browse/SPARK-30428)
so will the community consider introducing V2 for the long view ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]