This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 68347473869 [HUDI-6958] Fixing config naming for OOB schema evol
support to handle missing cols and data type demotion. (#10085)
68347473869 is described below
commit 68347473869b4f34fd9d44627e3a265b8cebd943
Author: lokesh-lingarajan-0310
<[email protected]>
AuthorDate: Tue Nov 14 08:54:33 2023 -0800
[HUDI-6958] Fixing config naming for OOB schema evol support to handle
missing cols and data type demotion. (#10085)
Also adding a test case for handling missing non-nullable columns.
Co-authored-by: Lokesh Lingarajan
<[email protected]>
---
.../hudi/common/config/HoodieCommonConfig.java | 11 +-
.../scala/org/apache/hudi/DataSourceOptions.scala | 2 +-
.../scala/org/apache/hudi/HoodieSchemaUtils.scala | 4 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 130 ---------------------
...TestHoodieDeltaStreamerSchemaEvolutionBase.java | 2 +-
...estHoodieDeltaStreamerSchemaEvolutionQuick.java | 80 +++++++++++++
6 files changed, 91 insertions(+), 138 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
index ec96faf8d7e..4e98ee27c9f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
@@ -81,13 +81,16 @@ public class HoodieCommonConfig extends HoodieConfig {
+ " operation will fail schema compatibility check. Set this option
to true will make the newly added "
+ " column nullable to successfully complete the write operation.");
- public static final ConfigProperty<String> SET_NULL_FOR_MISSING_COLUMNS =
ConfigProperty
- .key("hoodie.write.set.null.for.missing.columns")
+ public static final ConfigProperty<String>
HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS = ConfigProperty
+ .key("hoodie.write.handle.missing.cols.with.lossless.type.promotion")
.defaultValue("false")
.markAdvanced()
- .withDocumentation("When a non-nullable column is missing from incoming
batch during a write operation, the write "
+ .withDocumentation("When a nullable column is missing from incoming
batch during a write operation, the write "
+ " operation will fail schema compatibility check. Set this option
to true will make the missing "
- + " column be filled with null values to successfully complete the
write operation.");
+ + " column be filled with null values to successfully complete the
write operation. Similarly lossless promotion"
+ + " are type promotions that are not back compatible like long to
int, double to float etc can be handled "
+ + " by setting this config to true, in which case incoming data will
be promoted to the table schema type"
+ + " and written to the table.");
public static final ConfigProperty<ExternalSpillableMap.DiskMapType>
SPILLABLE_DISK_MAP_TYPE = ConfigProperty
.key("hoodie.common.spillable.diskmap.type")
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index efa9c9e692f..80f7cebd3a2 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -539,7 +539,7 @@ object DataSourceWriteOptions {
@Deprecated
val RECONCILE_SCHEMA: ConfigProperty[java.lang.Boolean] =
HoodieCommonConfig.RECONCILE_SCHEMA
- val SET_NULL_FOR_MISSING_COLUMNS: ConfigProperty[String] =
HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS
+ val HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS:
ConfigProperty[String] =
HoodieCommonConfig.HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS
val MAKE_NEW_COLUMNS_NULLABLE: ConfigProperty[java.lang.Boolean] =
HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
index ed073ce4b17..c458a5ece6f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
@@ -76,8 +76,8 @@ object HoodieSchemaUtils {
latestTableSchemaOpt: Option[Schema],
internalSchemaOpt: Option[InternalSchema],
opts: Map[String, String]): Schema = {
- val setNullForMissingColumns =
opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(),
-
DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean
+ val setNullForMissingColumns =
opts.getOrDefault(DataSourceWriteOptions.HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS.key(),
+
DataSourceWriteOptions.HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS.defaultValue).toBoolean
val shouldReconcileSchema =
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
val shouldValidateSchemasCompatibility =
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 4504803377e..cbde026adeb 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -554,136 +554,6 @@ class HoodieSparkSqlWriterInternal {
}
}
- /**
- * Deduces writer's schema based on
- * <ul>
- * <li>Source's schema</li>
- * <li>Target table's schema (including Hudi's [[InternalSchema]]
representation)</li>
- * </ul>
- */
- /*def deduceWriterSchema(sourceSchema: Schema,
- latestTableSchemaOpt: Option[Schema],
- internalSchemaOpt: Option[InternalSchema],
- opts: Map[String, String]): Schema = {
- val setNullForMissingColumns =
opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(),
-
DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean
- val shouldReconcileSchema =
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
- val shouldValidateSchemasCompatibility =
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
- HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
-
- latestTableSchemaOpt match {
- // In case table schema is empty we're just going to use the source
schema as a
- // writer's schema.
- case None => AvroInternalSchemaConverter.fixNullOrdering(sourceSchema)
- // Otherwise, we need to make sure we reconcile incoming and latest
table schemas
- case Some(latestTableSchemaWithMetaFields) =>
- // NOTE: Meta-fields will be unconditionally injected by Hudi writing
handles, for the sake of
- // deducing proper writer schema we're stripping them to make
sure we can perform proper
- // analysis
- //add call to fix null ordering to ensure backwards compatibility
- val latestTableSchema =
AvroInternalSchemaConverter.fixNullOrdering(removeMetadataFields(latestTableSchemaWithMetaFields))
- // Before validating whether schemas are compatible, we need to
"canonicalize" source's schema
- // relative to the table's one, by doing a (minor) reconciliation of
the nullability constraints:
- // for ex, if in incoming schema column A is designated as non-null,
but it's designated as nullable
- // in the table's one we want to proceed aligning nullability
constraints w/ the table's schema
- // Also, we promote types to the latest table schema if possible.
- val shouldCanonicalizeSchema =
opts.getOrDefault(CANONICALIZE_SCHEMA.key,
- CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
- val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
- SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
-
- val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
- canonicalizeSchema(sourceSchema, latestTableSchema, opts)
- } else {
- AvroInternalSchemaConverter.fixNullOrdering(sourceSchema)
- }
-
- val allowAutoEvolutionColumnDrop =
opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key,
-
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean
-
- if (shouldReconcileSchema) {
- internalSchemaOpt match {
- case Some(internalSchema) =>
- // Apply schema evolution, by auto-merging write schema and read
schema
- val mergedInternalSchema =
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
internalSchema)
- val evolvedSchema =
AvroInternalSchemaConverter.convert(mergedInternalSchema,
latestTableSchema.getFullName)
- val shouldRemoveMetaDataFromInternalSchema =
sourceSchema.getFields().filter(f =>
f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
- if (shouldRemoveMetaDataFromInternalSchema)
HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema
- case None =>
- // In case schema reconciliation is enabled we will employ
(legacy) reconciliation
- // strategy to produce target writer's schema (see definition
below)
- val (reconciledSchema, isCompatible) =
- reconcileSchemasLegacy(latestTableSchema,
canonicalizedSourceSchema)
-
- // NOTE: In some cases we need to relax constraint of incoming
dataset's schema to be compatible
- // w/ the table's one and allow schemas to diverge. This
is required in cases where
- // partial updates will be performed (for ex, `MERGE INTO`
Spark SQL statement) and as such
- // only incoming dataset's projection has to match the
table's schema, and not the whole one
- if (!shouldValidateSchemasCompatibility || isCompatible) {
- reconciledSchema
- } else {
- log.error(
- s"""Failed to reconcile incoming batch schema with the
table's one.
- |Incoming schema ${sourceSchema.toString(true)}
- |Incoming schema (canonicalized)
${canonicalizedSourceSchema.toString(true)}
- |Table's schema ${latestTableSchema.toString(true)}
- |""".stripMargin)
- throw new SchemaCompatibilityException("Failed to reconcile
incoming schema with the table's one")
- }
- }
- } else {
- // In case reconciliation is disabled, we have to validate that the
source's schema
- // is compatible w/ the table's latest schema, such that we're able
to read existing table's
- // records using [[sourceSchema]].
- //
- // NOTE: In some cases we need to relax constraint of incoming
dataset's schema to be compatible
- // w/ the table's one and allow schemas to diverge. This is
required in cases where
- // partial updates will be performed (for ex, `MERGE INTO`
Spark SQL statement) and as such
- // only incoming dataset's projection has to match the table's
schema, and not the whole one
-
- if (mergeIntoWrites) {
- // if its merge into writes, do not check for projection nor
schema compatibility. Writers down the line will
- // take care of it.
- canonicalizedSourceSchema
- } else {
- if (!shouldValidateSchemasCompatibility) {
- // if no validation is enabled, check for col drop
- if (allowAutoEvolutionColumnDrop) {
- canonicalizedSourceSchema
- } else {
- val reconciledSchema = if (setNullForMissingColumns) {
-
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
latestTableSchema)
- } else {
- canonicalizedSourceSchema
- }
- if (isValidEvolutionOf(reconciledSchema, latestTableSchema)) {
- reconciledSchema
- } else {
- log.error(
- s"""Incoming batch schema is not compatible with the
table's one.
- |Incoming schema ${sourceSchema.toString(true)}
- |Incoming schema (canonicalized)
${reconciledSchema.toString(true)}
- |Table's schema ${latestTableSchema.toString(true)}
- |""".stripMargin)
- throw new SchemaCompatibilityException("Incoming batch
schema is not compatible with the table's one")
- }
- }
- } else if (isSchemaCompatible(latestTableSchema,
canonicalizedSourceSchema, allowAutoEvolutionColumnDrop)) {
- canonicalizedSourceSchema
- } else {
- log.error(
- s"""Incoming batch schema is not compatible with the table's
one.
- |Incoming schema ${sourceSchema.toString(true)}
- |Incoming schema (canonicalized)
${canonicalizedSourceSchema.toString(true)}
- |Table's schema ${latestTableSchema.toString(true)}
- |""".stripMargin)
- throw new SchemaCompatibilityException("Incoming batch schema
is not compatible with the table's one")
- }
- }
- }
- }
- }*/
-
/**
* Resolve wildcards in partitions
*
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index d07a20c68b8..42c485ea2e2 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -150,7 +150,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStrea
extraProps.setProperty(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
"false");
extraProps.setProperty("hoodie.datasource.write.table.type", tableType);
extraProps.setProperty("hoodie.datasource.write.row.writer.enable",
rowWriterEnable.toString());
-
extraProps.setProperty(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS().key(),
Boolean.toString(nullForDeletedCols));
+
extraProps.setProperty(DataSourceWriteOptions.HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS().key(),
Boolean.toString(nullForDeletedCols));
//we set to 0 so that we create new base files on insert instead of adding
inserts to existing filegroups via small file handling
extraProps.setProperty("hoodie.parquet.small.file.limit", "0");
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index 36353b445c8..db7cb54fe76 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.deltastreamer;
+import org.apache.hudi.TestHoodieSparkUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -391,6 +392,85 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
}
}
+ @ParameterizedTest
+ @MethodSource("testParamsWithSchemaTransformer")
+ public void testNonNullableColumnDrop(String tableType,
+ Boolean rowWriterEnable,
+ Boolean useKafkaSource,
+ Boolean allowNullForDeletedCols,
+ Boolean useTransformer,
+ Boolean targetSchemaSameAsTableSchema) throws
Exception {
+ this.tableType = tableType;
+ this.rowWriterEnable = rowWriterEnable;
+ this.useKafkaSource = useKafkaSource;
+ this.shouldCluster = false;
+ this.shouldCompact = false;
+ this.addFilegroups = false;
+ this.multiLogFiles = false;
+ this.useTransformer = useTransformer;
+ if (useKafkaSource || targetSchemaSameAsTableSchema) {
+ this.useSchemaProvider = true;
+ }
+
+ boolean isCow = tableType.equals("COPY_ON_WRITE");
+ PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
+ tableBasePath = basePath + "test_parquet_table" + testNum;
+
+ //first write
+ String datapath =
String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
+ Dataset<Row> df = sparkSession.read().json(datapath);
+ df = TestHoodieSparkUtils.setColumnNotNullable(df, "rider");
+ resetTopicAndDeltaStreamer(allowNullForDeletedCols);
+ addData(df, true);
+ deltaStreamer.sync();
+ int numRecords = 6;
+ int numFiles = 3;
+ assertRecordCount(numRecords);
+ assertFileNumber(numFiles, isCow);
+
+ //add extra log files
+ if (tableType.equals("MERGE_ON_READ")) {
+ datapath =
String.class.getResource("/data/schema-evolution/extraLogFilesTestEverything.json").getPath();
+ df = sparkSession.read().json(datapath);
+ df = TestHoodieSparkUtils.setColumnNotNullable(df, "rider");
+ addData(df, false);
+ deltaStreamer.sync();
+ //this write contains updates for the 6 records from the first write, so
+ //although we have 2 files for each filegroup, we only see the log files
+ //represented in the read. So that is why numFiles is 3, not 6
+ assertRecordCount(numRecords);
+ assertFileNumber(numFiles, false);
+ }
+
+ if (targetSchemaSameAsTableSchema) {
+ TestSchemaProvider.setTargetSchema(TestSchemaProvider.sourceSchema);
+ }
+ resetTopicAndDeltaStreamer(allowNullForDeletedCols);
+
+ HoodieStreamer.Config dsConfig = deltaStreamer.getConfig();
+ HoodieTableMetaClient metaClient = getMetaClient(dsConfig);
+ HoodieInstant lastInstant =
metaClient.getActiveTimeline().lastInstant().get();
+
+ // drop column
+ datapath =
String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
+ df = sparkSession.read().json(datapath);
+ Dataset<Row> droppedColumnDf = df.drop("rider");
+ try {
+ addData(droppedColumnDf, true);
+ deltaStreamer.sync();
+ assertTrue(allowNullForDeletedCols || targetSchemaSameAsTableSchema);
+
+ metaClient.reloadActiveTimeline();
+ Option<Schema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, fs, dsConfig.targetBasePath, metaClient);
+
assertTrue(latestTableSchemaOpt.get().getField("rider").schema().getTypes()
+ .stream().anyMatch(t -> t.getType().equals(Schema.Type.STRING)));
+
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
> 0);
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("java.lang.NullPointerException")
+ || e.getMessage().contains("Incoming batch schema is not compatible
with the table's one"));
+ }
+ }
+
@ParameterizedTest
@MethodSource("testParamsWithSchemaTransformer")
public void testTypePromotion(String tableType,