yihua commented on code in PR #11770:
URL: https://github.com/apache/hudi/pull/11770#discussion_r1742846700
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala:
##########
@@ -75,6 +78,19 @@ abstract class HoodieBaseHadoopFsRelationFactory(val
sqlContext: SQLContext,
protected lazy val basePath: StoragePath = metaClient.getBasePath
protected lazy val partitionColumns: Array[String] =
tableConfig.getPartitionFields.orElse(Array.empty)
+ private lazy val keygenTypeHasVariablePartitionCols =
!isNullOrEmpty(tableConfig.getKeyGeneratorClassName) &&
+
(tableConfig.getKeyGeneratorClassName.equals("org.apache.hudi.keygen.TimestampBasedKeyGenerator")
||
Review Comment:
Use `TimestampBasedKeyGenerator.class.getName` instead of hardcoding the
class path.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -1494,15 +1494,11 @@ class TestCOWDataSource extends
HoodieSparkClientTestBase with ScalaAssertionSup
assert(firstDF.count() == 2)
- // data_date is the partition field. Persist to the parquet file using the
origin values, and read it.
- // TODO(HUDI-3204) we have to revert this to pre-existing behavior from
0.10
Review Comment:
How hard will it be to fix the existing code path (0.x, 1.x with file group
reader disabled)?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -231,4 +270,32 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
override def close(): Unit = closeableFileGroupRecordIterator.close()
}
}
+
+ private def readBaseFile(file: PartitionedFile, parquetFileReader:
SparkParquetReader, requestedSchema: StructType,
+ remainingPartitionSchema: StructType,
fixedPartitionIndexes: Set[Int], requiredSchema: StructType,
+ partitionSchema: StructType, outputSchema:
StructType, filters: Seq[Filter],
+ storageConf: StorageConfiguration[Configuration]):
Iterator[InternalRow] = {
+ if (remainingPartitionSchema.fields.length ==
partitionSchema.fields.length) {
+ parquetFileReader.read(file, requiredSchema, partitionSchema,
internalSchemaOpt, filters, storageConf)
+ } else if (remainingPartitionSchema.fields.length == 0) {
+ val pfileUtils = sparkAdapter.getSparkPartitionedFileUtils
+ val modifiedFile = pfileUtils.createPartitionedFile(InternalRow.empty,
pfileUtils.getPathFromPartitionedFile(file), file.start, file.length)
+ parquetFileReader.read(modifiedFile, outputSchema, new StructType(),
internalSchemaOpt, filters, storageConf)
+ } else {
+ val pfileUtils = sparkAdapter.getSparkPartitionedFileUtils
+ val partitionValues =
InternalRow.fromSeq(file.partitionValues.toSeq(partitionSchema).zipWithIndex.filter(p
=> fixedPartitionIndexes.contains(p._2)).map(p => p._1))
+ val modifiedFile = pfileUtils.createPartitionedFile(partitionValues,
pfileUtils.getPathFromPartitionedFile(file), file.start, file.length)
+ val iter = parquetFileReader.read(modifiedFile, requestedSchema,
remainingPartitionSchema, internalSchemaOpt, filters, storageConf)
+ projectIter(iter, StructType(requestedSchema.fields ++
remainingPartitionSchema.fields), outputSchema)
+ }
Review Comment:
Could you also add Javadocs here to note what the logic does in each case,
like what you mentioned in your annotation review comment?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]