nsivabalan commented on code in PR #17601:
URL: https://github.com/apache/hudi/pull/17601#discussion_r2807589377
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -111,7 +111,12 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
protected def tableName: String = metaClient.getTableConfig.getTableName
- protected lazy val conf: Configuration = new
Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ protected lazy val conf: Configuration = {
+ val c = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
Review Comment:
I am not sure if this is used anywhere only.
can we triage this and remove if not needed
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -244,7 +249,8 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
case HoodieFileFormat.PARQUET =>
// We're delegating to Spark to append partition values to every row
only in cases
// when these corresponding partition-values are not persisted w/in
the data file itself
- val parquetFileFormat =
sparkAdapter.createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
+ val parquetFileFormat =
sparkAdapter.createLegacyHoodieParquetFileFormat(
Review Comment:
this line is executed in the driver.
anyways, we are modifying to pass in additional arguments.
why can't we compute the value for `hasTimestampMillisFieldInTableSchema`
and pass in additional argument here.
that way, we can avoid computing this value repeatedly in every executor or
for every data file.
##########
hudi-hadoop-common/src/main/java/org/apache/parquet/avro/HoodieAvroReadSupport.java:
##########
@@ -36,19 +40,23 @@
*/
public class HoodieAvroReadSupport<T> extends AvroReadSupport<T> {
- public HoodieAvroReadSupport(GenericData model) {
+ private Option<MessageType> tableSchema;
+
+ public HoodieAvroReadSupport(GenericData model, Option<MessageType>
tableSchema) {
Review Comment:
same comment as else where.
we are modifying the constructor to take in table schema.
why can't we pass in the repaired schema directly.
or atleast the value for `hasTimestampMillisFieldInTableSchema`
and so we can use it L59?
##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala:
##########
@@ -63,15 +69,31 @@ import
scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
* <li>Schema on-read</li>
* </ol>
*/
-class Spark35LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
+class Spark35LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValues: Boolean,
+ private val avroTableSchema:
Schema) extends ParquetFileFormat with Logging {
+ private lazy val tableSchemaAsMessageType: HOption[MessageType] = {
+ if (avroTableSchema == null) {
+ HOption.empty()
+ } else {
+ HOption.ofNullable(
+ ParquetTableSchemaResolver.convertAvroSchemaToParquet(avroTableSchema,
new Configuration())
+ )
+ }
+ }
+ private lazy val hasTimestampMillisFieldInTableSchema = if (avroTableSchema
== null) {
+ true
+ } else {
+ AvroSchemaRepair.hasTimestampMillisField(avroTableSchema)
+ }
+ private lazy val supportBatchWithTableSchema = HoodieSparkUtils.gteqSpark3_5
|| !hasTimestampMillisFieldInTableSchema
Review Comment:
hey @yihua : can you review this change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]