yihua commented on code in PR #11770:
URL: https://github.com/apache/hudi/pull/11770#discussion_r1742846700


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala:
##########
@@ -75,6 +78,19 @@ abstract class HoodieBaseHadoopFsRelationFactory(val 
sqlContext: SQLContext,
   protected lazy val basePath: StoragePath = metaClient.getBasePath
   protected lazy val partitionColumns: Array[String] = 
tableConfig.getPartitionFields.orElse(Array.empty)
 
+  private lazy val keygenTypeHasVariablePartitionCols = 
!isNullOrEmpty(tableConfig.getKeyGeneratorClassName) &&
+    
(tableConfig.getKeyGeneratorClassName.equals("org.apache.hudi.keygen.TimestampBasedKeyGenerator")
 ||

Review Comment:
   Use `TimestampBasedKeyGenerator.class.getName` instead of hardcoding the 
class path.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -1494,15 +1494,11 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
 
     assert(firstDF.count() == 2)
 
-    // data_date is the partition field. Persist to the parquet file using the 
origin values, and read it.
-    // TODO(HUDI-3204) we have to revert this to pre-existing behavior from 
0.10

Review Comment:
   How hard will it be to fix the existing code path (0.x, 1.x with file group 
reader disabled)?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -231,4 +270,32 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
       override def close(): Unit = closeableFileGroupRecordIterator.close()
     }
   }
+
+  private def readBaseFile(file: PartitionedFile, parquetFileReader: 
SparkParquetReader, requestedSchema: StructType,
+                           remainingPartitionSchema: StructType, 
fixedPartitionIndexes: Set[Int], requiredSchema: StructType,
+                           partitionSchema: StructType, outputSchema: 
StructType, filters: Seq[Filter],
+                           storageConf: StorageConfiguration[Configuration]): 
Iterator[InternalRow] = {
+    if (remainingPartitionSchema.fields.length == 
partitionSchema.fields.length) {
+      parquetFileReader.read(file, requiredSchema, partitionSchema, 
internalSchemaOpt, filters, storageConf)
+    } else if (remainingPartitionSchema.fields.length == 0) {
+      val pfileUtils = sparkAdapter.getSparkPartitionedFileUtils
+      val modifiedFile = pfileUtils.createPartitionedFile(InternalRow.empty, 
pfileUtils.getPathFromPartitionedFile(file), file.start, file.length)
+      parquetFileReader.read(modifiedFile, outputSchema, new StructType(), 
internalSchemaOpt, filters, storageConf)
+    } else {
+      val pfileUtils = sparkAdapter.getSparkPartitionedFileUtils
+      val partitionValues = 
InternalRow.fromSeq(file.partitionValues.toSeq(partitionSchema).zipWithIndex.filter(p
 => fixedPartitionIndexes.contains(p._2)).map(p => p._1))
+      val modifiedFile = pfileUtils.createPartitionedFile(partitionValues, 
pfileUtils.getPathFromPartitionedFile(file), file.start, file.length)
+      val iter = parquetFileReader.read(modifiedFile, requestedSchema, 
remainingPartitionSchema, internalSchemaOpt, filters, storageConf)
+      projectIter(iter, StructType(requestedSchema.fields ++ 
remainingPartitionSchema.fields), outputSchema)
+    }

Review Comment:
   Could you also add Javadocs here to note what the logic does in each case, 
like what you mentioned in your annotation review comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to