nsivabalan commented on a change in pull request #2927:
URL: https://github.com/apache/hudi/pull/2927#discussion_r633009441
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
##########
@@ -40,8 +40,8 @@ public SparkAvroPostProcessor(TypedProperties props,
JavaSparkContext jssc) {
@Override
public Schema processSchema(Schema schema) {
- return AvroConversionUtils.convertStructTypeToAvroSchema(
+ return schema != null ? AvroConversionUtils.convertStructTypeToAvroSchema(
Review comment:
@n3nash : wanted to bring to your notice on this change. Prior to this
diff, looks like w/ schema post processor enabled, one can never set target
schema to null bcoz, the post processor will try to invoke this call. So, if I
am not wrong, the code path we have in DeltaSync.readFromSource(), where we
check if userProvidedTargetSchema is null will never be invoked only(bcoz,
always the target schema will be non null). Can you confirm if my
understanding is right.
If yes, may be when we introduced the post processor we missed this flow.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
##########
@@ -40,8 +40,8 @@ public SparkAvroPostProcessor(TypedProperties props,
JavaSparkContext jssc) {
@Override
public Schema processSchema(Schema schema) {
- return AvroConversionUtils.convertStructTypeToAvroSchema(
+ return schema != null ? AvroConversionUtils.convertStructTypeToAvroSchema(
Review comment:
@n3nash : wanted to bring to your notice on this change. Prior to this
diff, looks like w/ schema post processor enabled, one can never set target
schema to null bcoz, the post processor will try to invoke this call. So, if I
am not wrong, the code path we have in DeltaSync.readFromSource(), where we
check if userProvidedTargetSchema is null will never be invoked only(bcoz,
always the target schema will be non null). Can you confirm if my
understanding is right.
If yes, may be when we introduced the post processor we missed this flow.
btw, as you see I am removing the constraint. So, wanted to confirm that we
are not breaking any other flow by allowing null for target schema. My tests in
TestHoodieDeltaStreamer does tests all possible config knobs. but just wanted
to be sure.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
##########
@@ -40,8 +40,8 @@ public SparkAvroPostProcessor(TypedProperties props,
JavaSparkContext jssc) {
@Override
public Schema processSchema(Schema schema) {
- return AvroConversionUtils.convertStructTypeToAvroSchema(
+ return schema != null ? AvroConversionUtils.convertStructTypeToAvroSchema(
Review comment:
I mean, not providing any schema provider at all is supported. This is
the case where someone wants to set the source schema but wants to set target
schema to null bcoz, they have a transformer and so they do not know the actual
schema.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -90,22 +91,45 @@ object HoodieSparkUtils {
new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty,
fileStatusCache)
}
- def createRdd(df: DataFrame, structName: String, recordNamespace: String):
RDD[GenericRecord] = {
+ def createRdd(df: DataFrame, structName: String, recordNamespace: String,
mayBeUseLatestTableSchema: Boolean): RDD[GenericRecord] = {
Review comment:
https://issues.apache.org/jira/browse/HUDI-1907
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -333,6 +333,14 @@ object DataSourceWriteOptions {
val META_SYNC_CLIENT_TOOL_CLASS = "hoodie.meta.sync.client.tool.class"
val DEFAULT_META_SYNC_CLIENT_TOOL_CLASS = classOf[HiveSyncTool].getName
+ /**
+ * When a new batch of write has records with old schema, but latest table
schema got evolved, this config will
+ * upgrade the records to leverage latest table schema(default vals will be
injected to missing fields).
+ * If not, the batch would fail.
+ */
+ val UPGRADE_OLD_SCHEMA_RECORDS_TO_LATEST_TABLE_SCHEMA_OPT_KEY =
"hoodie.datasource.write.upgrade.old.schema.records.to.latest.table.schema"
Review comment:
I am ok with this renaming. but in general, this was my rational for
naming it that way. in general sense, schema mismatch could mean anything. it
could mean, addition of new cols, or missing few cols, or renaming of col
names, etc. And so wanted to be specific.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]