This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 39123cd5ac5 [HUDI-7327] remove meta cols from incoming schema in
stream sync (#10556)
39123cd5ac5 is described below
commit 39123cd5ac5e38b61c9372c378c40e2e4ac4eb1c
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Jan 25 16:15:43 2024 -0500
[HUDI-7327] remove meta cols from incoming schema in stream sync (#10556)
---------
Co-authored-by: Jonathan Vexler <=>
---
.../src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java | 7 +++++++
.../java/org/apache/hudi/common/config/HoodieCommonConfig.java | 1 +
.../src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 10 ++--------
.../java/org/apache/hudi/utilities/streamer/StreamSync.java | 2 +-
.../utilities/deltastreamer/HoodieDeltaStreamerTestBase.java | 2 ++
5 files changed, 13 insertions(+), 9 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 9b925eb59be..f7893c415a2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -323,7 +323,14 @@ public class HoodieAvroUtils {
return mergedSchema;
}
+ public static boolean isSchemaNull(Schema schema) {
+ return schema == null || schema.getType() == Schema.Type.NULL;
+ }
+
public static Schema removeMetadataFields(Schema schema) {
+ if (isSchemaNull(schema)) {
+ return schema;
+ }
return removeFields(schema,
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
index 4e98ee27c9f..4dda018f535 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
@@ -85,6 +85,7 @@ public class HoodieCommonConfig extends HoodieConfig {
.key("hoodie.write.handle.missing.cols.with.lossless.type.promotion")
.defaultValue("false")
.markAdvanced()
+ .withAlternatives("hoodie.write.set.null.for.missing.columns")
.withDocumentation("When a nullable column is missing from incoming
batch during a write operation, the write "
+ " operation will fail schema compatibility check. Set this option
to true will make the missing "
+ " column be filled with null values to successfully complete the
write operation. Similarly lossless promotion"
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index bab0448642c..7e099166f28 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -137,18 +137,12 @@ object HoodieSparkSqlWriter {
* <li>Target table's schema (including Hudi's [[InternalSchema]]
representation)</li>
* </ul>
*/
- def deduceWriterSchema(sourceSchema: Schema,
- latestTableSchemaOpt: Option[Schema],
- internalSchemaOpt: Option[InternalSchema],
- opts: Map[String, String]): Schema = {
- HoodieSchemaUtils.deduceWriterSchema(sourceSchema, latestTableSchemaOpt,
internalSchemaOpt, opts)
- }
-
def deduceWriterSchema(sourceSchema: Schema,
latestTableSchemaOpt: Option[Schema],
internalSchemaOpt: Option[InternalSchema],
props: TypedProperties): Schema = {
- deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt,
HoodieConversionUtils.fromProperties(props))
+ HoodieSchemaUtils.deduceWriterSchema(sourceSchema, latestTableSchemaOpt,
+ internalSchemaOpt, HoodieConversionUtils.fromProperties(props))
}
def cleanup(): Unit = {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 27eb9ec0017..6babbd640f0 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -669,7 +669,7 @@ public class StreamSync implements Serializable, Closeable {
// Deduce proper target (writer's) schema for the input dataset,
reconciling its
// schema w/ the table's one
Schema targetSchema = HoodieSparkSqlWriter.deduceWriterSchema(
- incomingSchema,
+ HoodieAvroUtils.removeMetadataFields(incomingSchema),
HoodieConversionUtils.toScalaOption(latestTableSchemaOpt),
HoodieConversionUtils.toScalaOption(internalSchemaOpt), props);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index cb9c2b85aa3..d6dc56f5b5e 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -67,6 +67,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS;
import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;
import static org.apache.hudi.common.util.StringUtils.nonEmpty;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
@@ -609,6 +610,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
cfg.schemaProviderClassName = schemaProviderClassName;
}
List<String> cfgs = new ArrayList<>();
+ cfgs.add(HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS.key() +
"=true");
cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt="
+ addReadLatestOnMissingCkpt);
cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath);
// No partition