yui2010 commented on a change in pull request #2378:
URL: https://github.com/apache/hudi/pull/2378#discussion_r589526502



##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
##########
@@ -504,6 +506,42 @@ class TestMORDataSource extends HoodieClientTestBase {
     hudiSnapshotDF2.show(1)
   }
 
+  @Test
+  def testPrunePartitions() {
+    // First Operation:
+    // Producing parquet files to three hive style partitions like 
/partition=20150316/.
+    // SNAPSHOT view on MOR table with parquet files only.
+    dataGen.setPartitionPaths(Array("20150316","20150317","20160315"));
+    val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")

Review comment:
       Hi, @garyli1019 and @nsivabalan
     yes, In fact, the case about  the partition column schema have not in data 
schema does exist and i write 
testcase(testPrunePartitionsWithAppendedPartitionsSchema) to cover this. 
   
   // we have three partition values: "2015/03/16","2015/03/17","2016/03/15", 
path view as follow:
   //    path
   //    └── to
   //      └── table
   //      ├── year=2015
   //    │   ├── month=03
   //          ├── day=16
   //    │   │   └── data.parquet
   //          ├── day=17
   //    │   │   └── data.parquet
   //      ├── year=2016
   //    │   ├── month=03
   //          ├── day=15
   //    │   │   └── data.parquet
   
   and i have a problem with the misplacement of the returned data.
   
   val sql = "select month, seconds_since_epoch, year  from 
mor_test_partition_table where year < '2016' and month > '01'" (select 
seconds_since_epoch, year, month  is ok )
   spark.sql(sql).show(1)
   
   it returned incorrect result as follow:
   +----------+-------------------+----+
   |     month  |seconds_since_epoch|year|
   +----------+-------------------+----+
   |1556774336|               2015       |   3  |
   +----------+-------------------+----+
   
   when we use spark.read.format("parquet")
   val sql = "select month, seconds_since_epoch, year  from 
mor_test_partition_table where year < '2016' and month > '01'"
   spark.sql(sql).show(1)
   it returned the correct result as follow: 
   +-----+------------------------+------+
   |month| seconds_since_epoch   | year    |
   +-----+------------------------+------+
   |    3    |7574769756561934803| 2015   |
   +-----+------------------------+------+
   
   and I investigated the spark code and found that the cause of this incorrect 
result was due to the different of RowDataSourceScanExec(datasource using eg. 
format("org.apache.hudi")) and FileSourceScanExec(spark native using eg. 
format("parquet"))
   
   1. when we using spark query parquet, ParquetRecordReader will return the 
internalRow value with data schema column(not contains partition schema column) 
value and append partition Values
   
   
[VectorizedParquetRecordReader](https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L208)
   ```
       if (partitionColumns != null) {
         int partitionIdx = sparkSchema.fields().length;
         for (int i = 0; i < partitionColumns.fields().length; i++) {
           ColumnVectorUtils.populate(columnVectors[i + partitionIdx], 
partitionValues, i);
           columnVectors[i + partitionIdx].setIsConstant();
         }
       }
   ```
   
   
   2. RowDataSourceScanExec and FileSourceScanExec have the output: 
Seq[Attribute]
   there is a default correspondence rule: the order of columnVectors 
corresponds to the order of output attributes 
   
   
**[FileSourceScanExec](https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L199)
 using readDataColumns ++ partitionColumns order**
   
   ```
         val readDataColumns =
           dataColumns
             .filter(requiredAttributes.contains)
             .filterNot(partitionColumns.contains)
         val outputSchema = readDataColumns.toStructType
         logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
   
         val outputAttributes = readDataColumns ++ partitionColumns
   
         val scan =
           FileSourceScanExec(
             fsRelation,
             outputAttributes,
             outputSchema,
             partitionKeyFilters.toSeq,
             bucketSet,
             dataFilters,
             table.map(_.identifier))
   ```
   such as: columnVectors[0] ~ output(0),columnVectors[1] ~ output(1),...
   
   
   
**[RowDataSourceScanExec](https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L78)
 using requiredColumns order**
   ```
   _/** Physical plan node for scanning data from a relation. */
   case class RowDataSourceScanExec(
       fullOutput: Seq[Attribute],
       requiredColumnsIndex: Seq[Int],
       filters: Set[Filter],
       handledFilters: Set[Filter],
       rdd: RDD[InternalRow],
       @transient relation: BaseRelation,
       override val tableIdentifier: Option[TableIdentifier])
     extends DataSourceScanExec {
   
     def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput)
   ```
   
   
   3. BufferedRowIterator will rebuild internalRow value order from 
[projectList](https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala#L135)'s
 index of output
       `val exprs = projectList.map(x => 
BindReferences.bindReference[Expression](x, child.output))`
   
   so FileSourceScanExec using **readDataColumns ++ partitionColumns order**  
return corrected result 
   ```
   project_month ~ output(2) ~ columnVectors[2]
   project_seconds_since_epoch ~ output(0) ~ columnVectors[0]
   project_year ~ output(1) ~ columnVectors[1]
   
   +-----+------------------------+------+
   |month| seconds_since_epoch   | year    |
   +-----+------------------------+------+
   |    3    |7574769756561934803| 2015   |
   +-----+------------------------+------+
   ```
   
   RowDataSourceScanExec using **requiredColumns order**  return incorrect 
result 
   ```
   project_month ~ output(0) ~ columnVectors[0] ~ seconds_since_epoch value
   project_seconds_since_epoch ~ output(1) ~ columnVectors[1] ~ year value
   project_year ~ output(2) ~ columnVectors[2] ~ month value
   
   +----------+-------------------+----+
   |     month  |seconds_since_epoch|year|
   +----------+-------------------+----+
   |1556774336|        2015       |   3  |
   +----------+-------------------+----+
   ```
   so it's very difficult to use spark2.x data source V1 code to achieve this 
function (have not api override RowDataSourceScanExec's output attribute) if i 
understand correctly
   and i researched datasource v2 some information as follow:
   1. datasource v2 api  spark 2.x 
[https://issues.apache.org/jira/browse/SPARK-15689](https://issues.apache.org/jira/browse/SPARK-15689)
     we can implement HoodieFileDataSourceReader#readSchema method 
   and after 
[pruneColumns](https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala#L108)
 we can get **readDataColumns ++ partitionColumns order** 
   
   2. datasource v2 api  spark 3.x 
[https://issues.apache.org/jira/browse/SPARK-22386](https://issues.apache.org/jira/browse/SPARK-22386)
  
   implementation will may be easier because there is already some native case 
[https://issues.apache.org/jira/browse/SPARK-30428](https://issues.apache.org/jira/browse/SPARK-30428)
   
   so will the community consider introducing V2 for the long view (not just 
for this function) ?
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to