This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 68347473869 [HUDI-6958] Fixing config naming for OOB schema evol 
support to handle missing cols and data type demotion. (#10085)
68347473869 is described below

commit 68347473869b4f34fd9d44627e3a265b8cebd943
Author: lokesh-lingarajan-0310 
<[email protected]>
AuthorDate: Tue Nov 14 08:54:33 2023 -0800

    [HUDI-6958] Fixing config naming for OOB schema evol support to handle 
missing cols and data type demotion. (#10085)
    
    Also adding a test case for handling missing non-nullable columns.
    
    Co-authored-by: Lokesh Lingarajan 
<[email protected]>
---
 .../hudi/common/config/HoodieCommonConfig.java     |  11 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   2 +-
 .../scala/org/apache/hudi/HoodieSchemaUtils.scala  |   4 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 130 ---------------------
 ...TestHoodieDeltaStreamerSchemaEvolutionBase.java |   2 +-
 ...estHoodieDeltaStreamerSchemaEvolutionQuick.java |  80 +++++++++++++
 6 files changed, 91 insertions(+), 138 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
index ec96faf8d7e..4e98ee27c9f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
@@ -81,13 +81,16 @@ public class HoodieCommonConfig extends HoodieConfig {
           + " operation will fail schema compatibility check. Set this option 
to true will make the newly added "
           + " column nullable to successfully complete the write operation.");
 
-  public static final ConfigProperty<String> SET_NULL_FOR_MISSING_COLUMNS = 
ConfigProperty
-      .key("hoodie.write.set.null.for.missing.columns")
+  public static final ConfigProperty<String> 
HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS = ConfigProperty
+      .key("hoodie.write.handle.missing.cols.with.lossless.type.promotion")
       .defaultValue("false")
       .markAdvanced()
-      .withDocumentation("When a non-nullable column is missing from incoming 
batch during a write operation, the write "
+      .withDocumentation("When a nullable column is missing from incoming 
batch during a write operation, the write "
           + " operation will fail schema compatibility check. Set this option 
to true will make the missing "
-          + " column be filled with null values to successfully complete the 
write operation.");
+          + " column be filled with null values to successfully complete the 
write operation. Similarly lossless promotion"
+          + " are type promotions that are not back compatible like long to 
int, double to float etc can be handled "
+          + " by setting this config to true, in which case incoming data will 
be promoted to the table schema type"
+          + " and written to the table.");
 
   public static final ConfigProperty<ExternalSpillableMap.DiskMapType> 
SPILLABLE_DISK_MAP_TYPE = ConfigProperty
       .key("hoodie.common.spillable.diskmap.type")
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index efa9c9e692f..80f7cebd3a2 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -539,7 +539,7 @@ object DataSourceWriteOptions {
   @Deprecated
   val RECONCILE_SCHEMA: ConfigProperty[java.lang.Boolean] = 
HoodieCommonConfig.RECONCILE_SCHEMA
 
-  val SET_NULL_FOR_MISSING_COLUMNS: ConfigProperty[String] = 
HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS
+  val HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS: 
ConfigProperty[String] = 
HoodieCommonConfig.HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS
 
   val MAKE_NEW_COLUMNS_NULLABLE: ConfigProperty[java.lang.Boolean] = 
HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
index ed073ce4b17..c458a5ece6f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
@@ -76,8 +76,8 @@ object HoodieSchemaUtils {
                          latestTableSchemaOpt: Option[Schema],
                          internalSchemaOpt: Option[InternalSchema],
                          opts: Map[String, String]): Schema = {
-    val setNullForMissingColumns = 
opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(),
-      
DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean
+    val setNullForMissingColumns = 
opts.getOrDefault(DataSourceWriteOptions.HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS.key(),
+      
DataSourceWriteOptions.HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS.defaultValue).toBoolean
     val shouldReconcileSchema = 
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
     val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
       HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 4504803377e..cbde026adeb 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -554,136 +554,6 @@ class HoodieSparkSqlWriterInternal {
     }
   }
 
-  /**
-   * Deduces writer's schema based on
-   * <ul>
-   *   <li>Source's schema</li>
-   *   <li>Target table's schema (including Hudi's [[InternalSchema]] 
representation)</li>
-   * </ul>
-   */
-  /*def deduceWriterSchema(sourceSchema: Schema,
-                         latestTableSchemaOpt: Option[Schema],
-                         internalSchemaOpt: Option[InternalSchema],
-                         opts: Map[String, String]): Schema = {
-    val setNullForMissingColumns = 
opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(),
-      
DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean
-    val shouldReconcileSchema = 
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
-    val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
-      HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
-
-    latestTableSchemaOpt match {
-      // In case table schema is empty we're just going to use the source 
schema as a
-      // writer's schema.
-      case None => AvroInternalSchemaConverter.fixNullOrdering(sourceSchema)
-      // Otherwise, we need to make sure we reconcile incoming and latest 
table schemas
-      case Some(latestTableSchemaWithMetaFields) =>
-        // NOTE: Meta-fields will be unconditionally injected by Hudi writing 
handles, for the sake of
-        //       deducing proper writer schema we're stripping them to make 
sure we can perform proper
-        //       analysis
-        //add call to fix null ordering to ensure backwards compatibility
-        val latestTableSchema = 
AvroInternalSchemaConverter.fixNullOrdering(removeMetadataFields(latestTableSchemaWithMetaFields))
-        // Before validating whether schemas are compatible, we need to 
"canonicalize" source's schema
-        // relative to the table's one, by doing a (minor) reconciliation of 
the nullability constraints:
-        // for ex, if in incoming schema column A is designated as non-null, 
but it's designated as nullable
-        // in the table's one we want to proceed aligning nullability 
constraints w/ the table's schema
-        // Also, we promote types to the latest table schema if possible.
-        val shouldCanonicalizeSchema = 
opts.getOrDefault(CANONICALIZE_SCHEMA.key,
-          CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
-        val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
-          SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
-
-        val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
-          canonicalizeSchema(sourceSchema, latestTableSchema, opts)
-        } else {
-          AvroInternalSchemaConverter.fixNullOrdering(sourceSchema)
-        }
-
-        val allowAutoEvolutionColumnDrop = 
opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key,
-          
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean
-
-        if (shouldReconcileSchema) {
-          internalSchemaOpt match {
-            case Some(internalSchema) =>
-              // Apply schema evolution, by auto-merging write schema and read 
schema
-              val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, 
internalSchema)
-              val evolvedSchema = 
AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getFullName)
-              val shouldRemoveMetaDataFromInternalSchema = 
sourceSchema.getFields().filter(f => 
f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
-              if (shouldRemoveMetaDataFromInternalSchema) 
HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema
-            case None =>
-              // In case schema reconciliation is enabled we will employ 
(legacy) reconciliation
-              // strategy to produce target writer's schema (see definition 
below)
-              val (reconciledSchema, isCompatible) =
-                reconcileSchemasLegacy(latestTableSchema, 
canonicalizedSourceSchema)
-
-              // NOTE: In some cases we need to relax constraint of incoming 
dataset's schema to be compatible
-              //       w/ the table's one and allow schemas to diverge. This 
is required in cases where
-              //       partial updates will be performed (for ex, `MERGE INTO` 
Spark SQL statement) and as such
-              //       only incoming dataset's projection has to match the 
table's schema, and not the whole one
-              if (!shouldValidateSchemasCompatibility || isCompatible) {
-                reconciledSchema
-              } else {
-                log.error(
-                  s"""Failed to reconcile incoming batch schema with the 
table's one.
-                     |Incoming schema ${sourceSchema.toString(true)}
-                     |Incoming schema (canonicalized) 
${canonicalizedSourceSchema.toString(true)}
-                     |Table's schema ${latestTableSchema.toString(true)}
-                     |""".stripMargin)
-                throw new SchemaCompatibilityException("Failed to reconcile 
incoming schema with the table's one")
-              }
-          }
-        } else {
-          // In case reconciliation is disabled, we have to validate that the 
source's schema
-          // is compatible w/ the table's latest schema, such that we're able 
to read existing table's
-          // records using [[sourceSchema]].
-          //
-          // NOTE: In some cases we need to relax constraint of incoming 
dataset's schema to be compatible
-          //       w/ the table's one and allow schemas to diverge. This is 
required in cases where
-          //       partial updates will be performed (for ex, `MERGE INTO` 
Spark SQL statement) and as such
-          //       only incoming dataset's projection has to match the table's 
schema, and not the whole one
-
-          if (mergeIntoWrites) {
-            // if its merge into writes, do not check for projection nor 
schema compatibility. Writers down the line will
-            // take care of it.
-            canonicalizedSourceSchema
-          } else {
-            if (!shouldValidateSchemasCompatibility) {
-              // if no validation is enabled, check for col drop
-              if (allowAutoEvolutionColumnDrop) {
-                canonicalizedSourceSchema
-              } else {
-                val reconciledSchema = if (setNullForMissingColumns) {
-                  
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, 
latestTableSchema)
-                } else {
-                  canonicalizedSourceSchema
-                }
-                if (isValidEvolutionOf(reconciledSchema, latestTableSchema)) {
-                  reconciledSchema
-                } else {
-                  log.error(
-                    s"""Incoming batch schema is not compatible with the 
table's one.
-                       |Incoming schema ${sourceSchema.toString(true)}
-                       |Incoming schema (canonicalized) 
${reconciledSchema.toString(true)}
-                       |Table's schema ${latestTableSchema.toString(true)}
-                       |""".stripMargin)
-                  throw new SchemaCompatibilityException("Incoming batch 
schema is not compatible with the table's one")
-                }
-              }
-            } else if (isSchemaCompatible(latestTableSchema, 
canonicalizedSourceSchema, allowAutoEvolutionColumnDrop)) {
-                canonicalizedSourceSchema
-            } else {
-                log.error(
-                s"""Incoming batch schema is not compatible with the table's 
one.
-                   |Incoming schema ${sourceSchema.toString(true)}
-                   |Incoming schema (canonicalized) 
${canonicalizedSourceSchema.toString(true)}
-                   |Table's schema ${latestTableSchema.toString(true)}
-                   |""".stripMargin)
-                throw new SchemaCompatibilityException("Incoming batch schema 
is not compatible with the table's one")
-              }
-            }
-        }
-    }
-  }*/
-
   /**
    * Resolve wildcards in partitions
    *
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index d07a20c68b8..42c485ea2e2 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -150,7 +150,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
     extraProps.setProperty(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
"false");
     extraProps.setProperty("hoodie.datasource.write.table.type", tableType);
     extraProps.setProperty("hoodie.datasource.write.row.writer.enable", 
rowWriterEnable.toString());
-    
extraProps.setProperty(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS().key(),
 Boolean.toString(nullForDeletedCols));
+    
extraProps.setProperty(DataSourceWriteOptions.HANDLE_MISSING_COLUMNS_WITH_LOSSLESS_TYPE_PROMOTIONS().key(),
 Boolean.toString(nullForDeletedCols));
 
     //we set to 0 so that we create new base files on insert instead of adding 
inserts to existing filegroups via small file handling
     extraProps.setProperty("hoodie.parquet.small.file.limit", "0");
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index 36353b445c8..db7cb54fe76 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.utilities.deltastreamer;
 
+import org.apache.hudi.TestHoodieSparkUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -391,6 +392,85 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("testParamsWithSchemaTransformer")
+  public void testNonNullableColumnDrop(String tableType,
+                                Boolean rowWriterEnable,
+                                Boolean useKafkaSource,
+                                Boolean allowNullForDeletedCols,
+                                Boolean useTransformer,
+                                Boolean targetSchemaSameAsTableSchema) throws 
Exception {
+    this.tableType = tableType;
+    this.rowWriterEnable = rowWriterEnable;
+    this.useKafkaSource = useKafkaSource;
+    this.shouldCluster = false;
+    this.shouldCompact = false;
+    this.addFilegroups = false;
+    this.multiLogFiles = false;
+    this.useTransformer = useTransformer;
+    if (useKafkaSource || targetSchemaSameAsTableSchema) {
+      this.useSchemaProvider = true;
+    }
+
+    boolean isCow = tableType.equals("COPY_ON_WRITE");
+    PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + ++testNum;
+    tableBasePath = basePath + "test_parquet_table" + testNum;
+
+    //first write
+    String datapath = 
String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
+    Dataset<Row> df = sparkSession.read().json(datapath);
+    df = TestHoodieSparkUtils.setColumnNotNullable(df, "rider");
+    resetTopicAndDeltaStreamer(allowNullForDeletedCols);
+    addData(df, true);
+    deltaStreamer.sync();
+    int numRecords = 6;
+    int numFiles = 3;
+    assertRecordCount(numRecords);
+    assertFileNumber(numFiles, isCow);
+
+    //add extra log files
+    if (tableType.equals("MERGE_ON_READ")) {
+      datapath = 
String.class.getResource("/data/schema-evolution/extraLogFilesTestEverything.json").getPath();
+      df = sparkSession.read().json(datapath);
+      df = TestHoodieSparkUtils.setColumnNotNullable(df, "rider");
+      addData(df, false);
+      deltaStreamer.sync();
+      //this write contains updates for the 6 records from the first write, so
+      //although we have 2 files for each filegroup, we only see the log files
+      //represented in the read. So that is why numFiles is 3, not 6
+      assertRecordCount(numRecords);
+      assertFileNumber(numFiles, false);
+    }
+
+    if (targetSchemaSameAsTableSchema) {
+      TestSchemaProvider.setTargetSchema(TestSchemaProvider.sourceSchema);
+    }
+    resetTopicAndDeltaStreamer(allowNullForDeletedCols);
+
+    HoodieStreamer.Config dsConfig = deltaStreamer.getConfig();
+    HoodieTableMetaClient metaClient = getMetaClient(dsConfig);
+    HoodieInstant lastInstant = 
metaClient.getActiveTimeline().lastInstant().get();
+
+    // drop column
+    datapath = 
String.class.getResource("/data/schema-evolution/startTestEverything.json").getPath();
+    df = sparkSession.read().json(datapath);
+    Dataset<Row> droppedColumnDf = df.drop("rider");
+    try {
+      addData(droppedColumnDf, true);
+      deltaStreamer.sync();
+      assertTrue(allowNullForDeletedCols || targetSchemaSameAsTableSchema);
+
+      metaClient.reloadActiveTimeline();
+      Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, fs, dsConfig.targetBasePath, metaClient);
+      
assertTrue(latestTableSchemaOpt.get().getField("rider").schema().getTypes()
+          .stream().anyMatch(t -> t.getType().equals(Schema.Type.STRING)));
+      
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
 > 0);
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("java.lang.NullPointerException")
+          || e.getMessage().contains("Incoming batch schema is not compatible 
with the table's one"));
+    }
+  }
+
   @ParameterizedTest
   @MethodSource("testParamsWithSchemaTransformer")
   public void testTypePromotion(String tableType,

Reply via email to