n3nash commented on a change in pull request #2927:
URL: https://github.com/apache/hudi/pull/2927#discussion_r628787271
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -148,12 +148,21 @@ private[hudi] object HoodieSparkSqlWriter {
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
- val schema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName,
nameSpace)
+ var schema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName,
nameSpace)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
+ var (convertGenRecsToLatestTableSchema, latestSchema) =
schemaNeedsConversion(fs, basePath, sparkContext, schema)
// Convert to RDD[HoodieRecord]
- val genericRecords: RDD[GenericRecord] =
HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
+ var genericRecords: RDD[GenericRecord] =
HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
+ if(convertGenRecsToLatestTableSchema) { // if incoming batch is of
old schema but table has evolved schema,
Review comment:
This should already be taken care by the logic internally right ? If you
just set `var schema = convertSchemaIfNeeded(fs, basePath, sparkContext,
schema)` and simply return the `latest schema` and pass this schema to
`HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)`, internally we
should be making sure that the passed schema (which was earlier df.schema) is
honored. Can you explain why do we need this "if" condition ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]