nsivabalan commented on a change in pull request #2927:
URL: https://github.com/apache/hudi/pull/2927#discussion_r633009441



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
##########
@@ -40,8 +40,8 @@ public SparkAvroPostProcessor(TypedProperties props, 
JavaSparkContext jssc) {
 
   @Override
   public Schema processSchema(Schema schema) {
-    return AvroConversionUtils.convertStructTypeToAvroSchema(
+    return schema != null ? AvroConversionUtils.convertStructTypeToAvroSchema(

Review comment:
       @n3nash : wanted to bring to your notice on this change. Prior to this 
diff, looks like w/ schema post processor enabled, one can never set target 
schema to null bcoz, the post processor will try to invoke this call. So, if I 
am not wrong, the code path we have in DeltaSync.readFromSource(), where we 
check if userProvidedTargetSchema is null will never be invoked only(bcoz, 
always the target schema will be non null).  Can you confirm if my 
understanding is right. 
   If yes, may be when we introduced the post processor we missed this flow. 
   

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
##########
@@ -40,8 +40,8 @@ public SparkAvroPostProcessor(TypedProperties props, 
JavaSparkContext jssc) {
 
   @Override
   public Schema processSchema(Schema schema) {
-    return AvroConversionUtils.convertStructTypeToAvroSchema(
+    return schema != null ? AvroConversionUtils.convertStructTypeToAvroSchema(

Review comment:
       @n3nash : wanted to bring to your notice on this change. Prior to this 
diff, looks like w/ schema post processor enabled, one can never set target 
schema to null bcoz, the post processor will try to invoke this call. So, if I 
am not wrong, the code path we have in DeltaSync.readFromSource(), where we 
check if userProvidedTargetSchema is null will never be invoked only(bcoz, 
always the target schema will be non null).  Can you confirm if my 
understanding is right. 
   If yes, may be when we introduced the post processor we missed this flow. 
   btw, as you see I am removing the constraint. So, wanted to confirm that we 
are not breaking any other flow by allowing null for target schema. My tests in 
TestHoodieDeltaStreamer does tests all possible config knobs. but just wanted 
to be sure. 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
##########
@@ -40,8 +40,8 @@ public SparkAvroPostProcessor(TypedProperties props, 
JavaSparkContext jssc) {
 
   @Override
   public Schema processSchema(Schema schema) {
-    return AvroConversionUtils.convertStructTypeToAvroSchema(
+    return schema != null ? AvroConversionUtils.convertStructTypeToAvroSchema(

Review comment:
       I mean, not providing any schema provider at all is supported. This is 
the case where someone wants to set the source schema but wants to set target 
schema to null bcoz, they have a transformer and so they do not know the actual 
schema. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -90,22 +91,45 @@ object HoodieSparkUtils {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, 
fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String): 
RDD[GenericRecord] = {
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
mayBeUseLatestTableSchema: Boolean): RDD[GenericRecord] = {

Review comment:
       https://issues.apache.org/jira/browse/HUDI-1907

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -333,6 +333,14 @@ object DataSourceWriteOptions {
   val META_SYNC_CLIENT_TOOL_CLASS = "hoodie.meta.sync.client.tool.class"
   val DEFAULT_META_SYNC_CLIENT_TOOL_CLASS = classOf[HiveSyncTool].getName
 
+  /**
+   * When a new batch of write has records with old schema, but latest table 
schema got evolved, this config will
+   * upgrade the records to leverage latest table schema(default vals will be 
injected to missing fields).
+   * If not, the batch would fail.
+   */
+  val UPGRADE_OLD_SCHEMA_RECORDS_TO_LATEST_TABLE_SCHEMA_OPT_KEY = 
"hoodie.datasource.write.upgrade.old.schema.records.to.latest.table.schema"

Review comment:
       I am ok with this renaming. but in general, this was my rational for 
naming it that way. in general sense, schema mismatch could mean anything. it 
could mean, addition of new cols, or missing few cols, or renaming of col 
names, etc. And so wanted to be specific. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to