nsivabalan commented on code in PR #5347:
URL: https://github.com/apache/hudi/pull/5347#discussion_r852166660
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala:
##########
@@ -130,7 +129,15 @@ object HoodieSparkUtils extends SparkAdapterSupport {
*/
def createRdd(df: DataFrame, structName: String, recordNamespace: String,
reconcileToLatestSchema: Boolean,
latestTableSchema: org.apache.hudi.common.util.Option[Schema]
= org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
- val latestTableSchemaConverted = if (latestTableSchema.isPresent &&
reconcileToLatestSchema) Some(latestTableSchema.get()) else None
+ var latestTableSchemaConverted : Option[Schema] = None
+
+ if (latestTableSchema.isPresent && reconcileToLatestSchema) {
+ latestTableSchemaConverted = Some(latestTableSchema.get())
+ } else {
+ // cases when users want to use latestTableSchema but have not turned on
reconcileToLatestSchema explicitly
+ // for example, when using a Transformer implementation to transform
source RDD to target RDD
+ latestTableSchemaConverted = if (latestTableSchema.isPresent)
Some(latestTableSchema.get()) else None
Review Comment:
looks like this was a regression in one of the refactorings. we had this in
0.10.1.
```
var writeSchema : Schema = null;
var toReconcileSchema : Schema = null;
if (reconcileToLatestSchema && latestTableSchema.isPresent) {
// if reconcileToLatestSchema is set to true and latestSchema is
present, then try to leverage latestTableSchema.
// this code path will handle situations where records are serialized
in odl schema, but callers wish to convert
// to Rdd[GenericRecord] using different schema(could be evolved
schema or could be latest table schema)
writeSchema = dfWriteSchema
toReconcileSchema = latestTableSchema.get()
} else {
// there are paths where callers wish to use latestTableSchema to
convert to Rdd[GenericRecords] and not use
// row's schema. So use latestTableSchema if present. if not
available, fallback to using row's schema.
writeSchema = if (latestTableSchema.isPresent) {
latestTableSchema.get()} else { dfWriteSchema}
}
createRddInternal(df, writeSchema, toReconcileSchema, structName,
recordNamespace)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]