This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-0.13.0 by this push:
     new f8772de6dc4 [HUDI-5704] De-coupling column drop flag and schema 
validation flag (0.13.0) (#7856)
f8772de6dc4 is described below

commit f8772de6dc475dfc5aa990cb3c04606d552948e2
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Feb 5 23:10:35 2023 -0800

    [HUDI-5704] De-coupling column drop flag and schema validation flag 
(0.13.0) (#7856)
    
    De-coupling column drop flag and schema validation flag. Looks like we had 
tight coupling before.
    for eg,
    if table schema is col1, col2, col3
    and new incoming schema is col1, col2
    col drop config is set to false (which means col drop should not be 
supported), and schema validation is set to false, commit will succeed. 
Expectation is, commit should fail for this new batch.
    
    Fixing the use-case in this patch, where we are de-coupling them. col drop 
flag will be honored irrespective of whether schema validation is enabled or 
not.
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java | 17 +++++--
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 52 +++++++++++++++++-----
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  5 ++-
 .../hudi/TestAvroSchemaResolutionSupport.scala     | 20 ++++++---
 .../apache/spark/sql/hudi/TestMergeIntoTable.scala |  5 ++-
 5 files changed, 75 insertions(+), 24 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 545acdcf309..0a1bb747e1c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -59,10 +59,7 @@ public class AvroSchemaUtils {
     // In case schema projection is not allowed, new schema has to have all 
the same fields as the
     // old schema
     if (!allowProjection) {
-      // Check that each field in the oldSchema can be populated in the 
newSchema
-      if (prevSchema.getFields().stream()
-          .map(oldSchemaField -> 
SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField))
-          .anyMatch(Objects::isNull)) {
+      if (!canProject(prevSchema, newSchema)) {
         return false;
       }
     }
@@ -72,6 +69,18 @@ public class AvroSchemaUtils {
     return result.getType() == 
AvroSchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
   }
 
+  /**
+   * Check that each field in the prevSchema can be populated in the newSchema
+   * @param prevSchema prev schema.
+   * @param newSchema new schema
+   * @return true if prev schema is a projection of new schema.
+   */
+  public static boolean canProject(Schema prevSchema, Schema newSchema) {
+    return prevSchema.getFields().stream()
+        .map(oldSchemaField -> 
SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField))
+        .noneMatch(Objects::isNull);
+  }
+
   /**
    * Generates fully-qualified name for the Avro's schema based on the Table's 
name
    *
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index c62d5a491e0..8bbab860307 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -25,7 +25,7 @@ import 
org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema, getAv
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
 import org.apache.hudi.HoodieWriterUtils._
-import org.apache.hudi.avro.AvroSchemaUtils.{isCompatibleProjectionOf, 
isSchemaCompatible}
+import org.apache.hudi.avro.AvroSchemaUtils.{canProject, 
isCompatibleProjectionOf, isSchemaCompatible}
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.avro.HoodieAvroUtils.removeMetadataFields
 import org.apache.hudi.client.common.HoodieSparkEngineContext
@@ -87,6 +87,14 @@ object HoodieSparkSqlWriter {
     ConfigProperty.key("hoodie.internal.write.schema.canonicalize.nullable")
       .defaultValue(true)
 
+  /**
+   * For merge into from spark-sql, we need some special handling. for eg, 
schema validation should be disabled
+   * for writes from merge into. This config is used for internal purposes.
+   */
+  val SQL_MERGE_INTO_WRITES: ConfigProperty[Boolean] =
+    ConfigProperty.key("hoodie.internal.sql.merge.into.writes")
+      .defaultValue(false)
+
   private val log = LogManager.getLogger(getClass)
   private var tableExists: Boolean = false
   private var asyncCompactionTriggerFnDefined: Boolean = false
@@ -407,6 +415,9 @@ object HoodieSparkSqlWriter {
         // in the table's one we want to proceed aligning nullability 
constraints w/ the table's schema
         val shouldCanonicalizeNullable = 
opts.getOrDefault(CANONICALIZE_NULLABLE.key,
           CANONICALIZE_NULLABLE.defaultValue.toString).toBoolean
+        val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
+          SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
+
         val canonicalizedSourceSchema = if (shouldCanonicalizeNullable) {
           AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
         } else {
@@ -455,17 +466,38 @@ object HoodieSparkSqlWriter {
           //       w/ the table's one and allow schemas to diverge. This is 
required in cases where
           //       partial updates will be performed (for ex, `MERGE INTO` 
Spark SQL statement) and as such
           //       only incoming dataset's projection has to match the table's 
schema, and not the whole one
-          if (!shouldValidateSchemasCompatibility || 
isSchemaCompatible(latestTableSchema, canonicalizedSourceSchema, 
allowAutoEvolutionColumnDrop)) {
+
+          if (mergeIntoWrites) {
+            // if its merge into writes, do not check for projection nor 
schema compatability. Writers down the line will
+            // take care of it.
             canonicalizedSourceSchema
           } else {
-            log.error(
-              s"""Incoming batch schema is not compatible with the table's one.
-                 |Incoming schema ${sourceSchema.toString(true)}
-                 |Incoming schema (canonicalized) 
${canonicalizedSourceSchema.toString(true)}
-                 |Table's schema ${latestTableSchema.toString(true)}
-                 |""".stripMargin)
-            throw new SchemaCompatibilityException("Incoming batch schema is 
not compatible with the table's one")
-          }
+            if (!shouldValidateSchemasCompatibility) {
+              // if no validation is enabled, check for col drop
+              // if col drop is allowed, go ahead. if not, check for 
projection, so that we do not allow dropping cols
+              if (allowAutoEvolutionColumnDrop || 
canProject(latestTableSchema, canonicalizedSourceSchema)) {
+                canonicalizedSourceSchema
+              } else {
+                log.error(
+                  s"""Incoming batch schema is not compatible with the table's 
one.
+                   |Incoming schema ${sourceSchema.toString(true)}
+                   |Incoming schema (canonicalized) 
${canonicalizedSourceSchema.toString(true)}
+                   |Table's schema ${latestTableSchema.toString(true)}
+                   |""".stripMargin)
+                throw new SchemaCompatibilityException("Incoming batch schema 
is not compatible with the table's one")
+              }
+            } else if (isSchemaCompatible(latestTableSchema, 
canonicalizedSourceSchema, allowAutoEvolutionColumnDrop)) {
+                canonicalizedSourceSchema
+            } else {
+                log.error(
+                s"""Incoming batch schema is not compatible with the table's 
one.
+                   |Incoming schema ${sourceSchema.toString(true)}
+                   |Incoming schema (canonicalized) 
${canonicalizedSourceSchema.toString(true)}
+                   |Table's schema ${latestTableSchema.toString(true)}
+                   |""".stripMargin)
+                throw new SchemaCompatibilityException("Incoming batch schema 
is not compatible with the table's one")
+              }
+            }
         }
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index ed3f2591253..54f2534b4a0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi.command
 import org.apache.avro.Schema
 import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema
 import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.HoodieSparkSqlWriter.CANONICALIZE_NULLABLE
+import org.apache.hudi.HoodieSparkSqlWriter.{CANONICALIZE_NULLABLE, 
SQL_MERGE_INTO_WRITES}
 import org.apache.hudi.common.model.HoodieAvroRecordMerger
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.config.HoodieWriteConfig
@@ -532,7 +532,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       //       target table, ie partially updating)
       AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false",
       RECONCILE_SCHEMA.key -> "false",
-      CANONICALIZE_NULLABLE.key -> "false"
+      CANONICALIZE_NULLABLE.key -> "false",
+      SQL_MERGE_INTO_WRITES.key -> "true"
     )
 
     combineOptions(hoodieCatalogTable, tableConfig, 
sparkSession.sqlContext.conf,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
index cd6396f08fb..2b1060e90f0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -19,6 +19,7 @@
 package org.apache.hudi
 
 import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.SchemaCompatibilityException
 import org.apache.hudi.testutils.HoodieClientTestBase
@@ -26,7 +27,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
 
 import scala.language.postfixOps
 
@@ -83,7 +84,8 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
       .save(saveDir)
   }
 
-  def upsertData(df: DataFrame, saveDir: String, isCow: Boolean = true, 
shouldAllowDroppedColumns: Boolean = false): Unit = {
+  def upsertData(df: DataFrame, saveDir: String, isCow: Boolean = true, 
shouldAllowDroppedColumns: Boolean = false,
+                 enableSchemaValidation: Boolean = 
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue().toBoolean): Unit = 
{
     var opts = if (isCow) {
       commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
"COPY_ON_WRITE")
     } else {
@@ -206,12 +208,18 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testDeleteColumn(isCow: Boolean): Unit = {
+  @CsvSource(value = Array(
+    "COPY_ON_WRITE,true",
+    "COPY_ON_WRITE,false",
+    "MERGE_ON_READ,true",
+    "MERGE_ON_READ,false"
+  ))
+  def testDeleteColumn(tableType: String, schemaValidationEnabled : Boolean): 
Unit = {
     // test to delete a column
     val tempRecordPath = basePath + "/record_tbl/"
     val _spark = spark
     import _spark.implicits._
+    val isCow = tableType.equals(HoodieTableType.COPY_ON_WRITE.name())
 
     val df1 = Seq((1, 100, "aaa")).toDF("id", "userid", "name")
     val df2 = Seq((2, "bbb")).toDF("id", "name")
@@ -231,10 +239,10 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase with ScalaAss
 
     // upsert
     assertThrows(classOf[SchemaCompatibilityException]) {
-      upsertData(upsertDf, tempRecordPath, isCow)
+      upsertData(upsertDf, tempRecordPath, isCow, enableSchemaValidation = 
schemaValidationEnabled)
     }
 
-    upsertData(upsertDf, tempRecordPath, isCow, shouldAllowDroppedColumns = 
true)
+    upsertData(upsertDf, tempRecordPath, isCow, shouldAllowDroppedColumns = 
true, enableSchemaValidation = schemaValidationEnabled)
 
     // read out the table
     val readDf = spark.read.format("hudi").load(tempRecordPath)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 0981f74a102..99fd72d5a28 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.sql.hudi
 
-import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers, 
HoodieSparkUtils}
+import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers, 
HoodieSparkUtils, ScalaAssertionSupport}
 import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.exception.SchemaCompatibilityException
 
-class TestMergeIntoTable extends HoodieSparkSqlTestBase {
+class TestMergeIntoTable extends HoodieSparkSqlTestBase with 
ScalaAssertionSupport {
 
   test("Test MergeInto Basic") {
     withRecordType()(withTempDir { tmp =>

Reply via email to