This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 39123cd5ac5 [HUDI-7327] remove meta cols from incoming schema in 
stream sync (#10556)
39123cd5ac5 is described below

commit 39123cd5ac5e38b61c9372c378c40e2e4ac4eb1c
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Jan 25 16:15:43 2024 -0500

    [HUDI-7327] remove meta cols from incoming schema in stream sync (#10556)
    
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java    |  7 +++++++
 .../java/org/apache/hudi/common/config/HoodieCommonConfig.java |  1 +
 .../src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala  | 10 ++--------
 .../java/org/apache/hudi/utilities/streamer/StreamSync.java    |  2 +-
 .../utilities/deltastreamer/HoodieDeltaStreamerTestBase.java   |  2 ++
 5 files changed, 13 insertions(+), 9 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 9b925eb59be..f7893c415a2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -323,7 +323,14 @@ public class HoodieAvroUtils {
     return mergedSchema;
   }
 
+  public static boolean isSchemaNull(Schema schema) {
+    return schema == null || schema.getType() == Schema.Type.NULL;
+  }
+
   public static Schema removeMetadataFields(Schema schema) {
+    if (isSchemaNull(schema)) {
+      return schema;
+    }
     return removeFields(schema, 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
index 4e98ee27c9f..4dda018f535 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
@@ -85,6 +85,7 @@ public class HoodieCommonConfig extends HoodieConfig {
       .key("hoodie.write.handle.missing.cols.with.lossless.type.promotion")
       .defaultValue("false")
       .markAdvanced()
+      .withAlternatives("hoodie.write.set.null.for.missing.columns")
       .withDocumentation("When a nullable column is missing from incoming 
batch during a write operation, the write "
           + " operation will fail schema compatibility check. Set this option 
to true will make the missing "
           + " column be filled with null values to successfully complete the 
write operation. Similarly lossless promotion"
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index bab0448642c..7e099166f28 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -137,18 +137,12 @@ object HoodieSparkSqlWriter {
    * <li>Target table's schema (including Hudi's [[InternalSchema]] 
representation)</li>
    * </ul>
    */
-  def deduceWriterSchema(sourceSchema: Schema,
-                         latestTableSchemaOpt: Option[Schema],
-                         internalSchemaOpt: Option[InternalSchema],
-                         opts: Map[String, String]): Schema = {
-    HoodieSchemaUtils.deduceWriterSchema(sourceSchema, latestTableSchemaOpt, 
internalSchemaOpt, opts)
-  }
-
   def deduceWriterSchema(sourceSchema: Schema,
                          latestTableSchemaOpt: Option[Schema],
                          internalSchemaOpt: Option[InternalSchema],
                          props: TypedProperties): Schema = {
-    deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, 
HoodieConversionUtils.fromProperties(props))
+    HoodieSchemaUtils.deduceWriterSchema(sourceSchema, latestTableSchemaOpt,
+      internalSchemaOpt, HoodieConversionUtils.fromProperties(props))
   }
 
   def cleanup(): Unit = {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 27eb9ec0017..6babbd640f0 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -669,7 +669,7 @@ public class StreamSync implements Serializable, Closeable {
     // Deduce proper target (writer's) schema for the input dataset, 
reconciling its
     // schema w/ the table's one
     Schema targetSchema = HoodieSparkSqlWriter.deduceWriterSchema(
-          incomingSchema,
+          HoodieAvroUtils.removeMetadataFields(incomingSchema),
           HoodieConversionUtils.toScalaOption(latestTableSchemaOpt),
           HoodieConversionUtils.toScalaOption(internalSchemaOpt), props);
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index cb9c2b85aa3..d6dc56f5b5e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -67,6 +67,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS;
 import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;
 import static org.apache.hudi.common.util.StringUtils.nonEmpty;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
@@ -609,6 +610,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
         cfg.schemaProviderClassName = schemaProviderClassName;
       }
       List<String> cfgs = new ArrayList<>();
+      cfgs.add(HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS.key() + 
"=true");
       
cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" 
+ addReadLatestOnMissingCkpt);
       cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath);
       // No partition

Reply via email to