nsivabalan commented on a change in pull request #2927:
URL: https://github.com/apache/hudi/pull/2927#discussion_r655052508
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -89,22 +89,45 @@ object HoodieSparkUtils extends SparkAdapterSupport {
new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty,
fileStatusCache)
}
- def createRdd(df: DataFrame, structName: String, recordNamespace: String):
RDD[GenericRecord] = {
+ def createRdd(df: DataFrame, structName: String, recordNamespace: String,
upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
+ createRdd(df, null, structName, recordNamespace,
upgradeToLatestSchemaIfNeeded)
+ }
+
+ def createRdd(df: DataFrame, latestSchema: Schema, structName: String,
recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean):
RDD[GenericRecord] = {
val avroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName,
recordNamespace)
- createRdd(df, avroSchema, structName, recordNamespace)
+ // if upgradeToLatestSchemaIfNeeded is set to true and latestSchema is not
null, then try to leverage latestSchema
+ // this code path will handle situations where records are serialized in
schema1, but callers wish to convert to
+ // Rdd[GenericRecord] using different schema(could be evolved schema or
could be latest table schema)
+ if (upgradeToLatestSchemaIfNeeded && latestSchema != null) {
Review comment:
There are quite a few callers which might hit this if condition.
Cases when latestSchema != null
1. HoodieSparkSqlWriter regular operations (part of this patch) line 165.
Could be incoming batch's writer schema or latest table schema.
2. DeltaSync.read
a. non null transformer. SchemaProvider.getTargetSchema is passed in a
latestSchema
b. null transformer. Could be incoming batch's writer schema or latest
table schema.
3. SourceFormatAdaptor.fetchNewDataInAvroFormat. RowSource. If
FileBasedSchemaProvider, schemaProvider.getSourceSchema is passed in as
latestSchema. But upgradeToLatestSchemaIfNeeded is set to false explicitly.
Note: cases 1 and 2 will pass the config value for
upgradeToLatestSchemaIfNeeded as configured by the user.
So in these cases, depending on whether upgradeToLatestSchemaIfNeeded is set
to true, we wanna go into line 102. But if set to false, but still if
latestSchema is set, we might wanna use latestSchema in line 106.
If not for any of these scenarios, we wanna use df's schema(avroSchema) in
line 106.
I am up for simplifying this code block by all means. Open to suggestions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]