This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-0.13.0 by this push:
new f8772de6dc4 [HUDI-5704] De-coupling column drop flag and schema
validation flag (0.13.0) (#7856)
f8772de6dc4 is described below
commit f8772de6dc475dfc5aa990cb3c04606d552948e2
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Feb 5 23:10:35 2023 -0800
[HUDI-5704] De-coupling column drop flag and schema validation flag
(0.13.0) (#7856)
De-coupling column drop flag and schema validation flag. Looks like we had
tight coupling before.
for eg,
if table schema is col1, col2, col3
and new incoming schema is col1, col2
col drop config is set to false (which means col drop should not be
supported), and schema validation is set to false, commit will succeed.
Expectation is, commit should fail for this new batch.
Fixing the use-case in this patch, where we are de-coupling them. col drop
flag will be honored irrespective of whether schema validation is enabled or
not.
Co-authored-by: Sagar Sumit <[email protected]>
---
.../java/org/apache/hudi/avro/AvroSchemaUtils.java | 17 +++++--
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 52 +++++++++++++++++-----
.../hudi/command/MergeIntoHoodieTableCommand.scala | 5 ++-
.../hudi/TestAvroSchemaResolutionSupport.scala | 20 ++++++---
.../apache/spark/sql/hudi/TestMergeIntoTable.scala | 5 ++-
5 files changed, 75 insertions(+), 24 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 545acdcf309..0a1bb747e1c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -59,10 +59,7 @@ public class AvroSchemaUtils {
// In case schema projection is not allowed, new schema has to have all
the same fields as the
// old schema
if (!allowProjection) {
- // Check that each field in the oldSchema can be populated in the
newSchema
- if (prevSchema.getFields().stream()
- .map(oldSchemaField ->
SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField))
- .anyMatch(Objects::isNull)) {
+ if (!canProject(prevSchema, newSchema)) {
return false;
}
}
@@ -72,6 +69,18 @@ public class AvroSchemaUtils {
return result.getType() ==
AvroSchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
}
+ /**
+ * Check that each field in the prevSchema can be populated in the newSchema
+ * @param prevSchema prev schema.
+ * @param newSchema new schema
+ * @return true if prev schema is a projection of new schema.
+ */
+ public static boolean canProject(Schema prevSchema, Schema newSchema) {
+ return prevSchema.getFields().stream()
+ .map(oldSchemaField ->
SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField))
+ .noneMatch(Objects::isNull);
+ }
+
/**
* Generates fully-qualified name for the Avro's schema based on the Table's
name
*
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index c62d5a491e0..8bbab860307 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -25,7 +25,7 @@ import
org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema, getAv
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
import org.apache.hudi.HoodieWriterUtils._
-import org.apache.hudi.avro.AvroSchemaUtils.{isCompatibleProjectionOf,
isSchemaCompatible}
+import org.apache.hudi.avro.AvroSchemaUtils.{canProject,
isCompatibleProjectionOf, isSchemaCompatible}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.HoodieAvroUtils.removeMetadataFields
import org.apache.hudi.client.common.HoodieSparkEngineContext
@@ -87,6 +87,14 @@ object HoodieSparkSqlWriter {
ConfigProperty.key("hoodie.internal.write.schema.canonicalize.nullable")
.defaultValue(true)
+ /**
+ * For merge into from spark-sql, we need some special handling. for eg,
schema validation should be disabled
+ * for writes from merge into. This config is used for internal purposes.
+ */
+ val SQL_MERGE_INTO_WRITES: ConfigProperty[Boolean] =
+ ConfigProperty.key("hoodie.internal.sql.merge.into.writes")
+ .defaultValue(false)
+
private val log = LogManager.getLogger(getClass)
private var tableExists: Boolean = false
private var asyncCompactionTriggerFnDefined: Boolean = false
@@ -407,6 +415,9 @@ object HoodieSparkSqlWriter {
// in the table's one we want to proceed aligning nullability
constraints w/ the table's schema
val shouldCanonicalizeNullable =
opts.getOrDefault(CANONICALIZE_NULLABLE.key,
CANONICALIZE_NULLABLE.defaultValue.toString).toBoolean
+ val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
+ SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
+
val canonicalizedSourceSchema = if (shouldCanonicalizeNullable) {
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema,
latestTableSchema)
} else {
@@ -455,17 +466,38 @@ object HoodieSparkSqlWriter {
// w/ the table's one and allow schemas to diverge. This is
required in cases where
// partial updates will be performed (for ex, `MERGE INTO`
Spark SQL statement) and as such
// only incoming dataset's projection has to match the table's
schema, and not the whole one
- if (!shouldValidateSchemasCompatibility ||
isSchemaCompatible(latestTableSchema, canonicalizedSourceSchema,
allowAutoEvolutionColumnDrop)) {
+
+ if (mergeIntoWrites) {
+ // if its merge into writes, do not check for projection nor
schema compatability. Writers down the line will
+ // take care of it.
canonicalizedSourceSchema
} else {
- log.error(
- s"""Incoming batch schema is not compatible with the table's one.
- |Incoming schema ${sourceSchema.toString(true)}
- |Incoming schema (canonicalized)
${canonicalizedSourceSchema.toString(true)}
- |Table's schema ${latestTableSchema.toString(true)}
- |""".stripMargin)
- throw new SchemaCompatibilityException("Incoming batch schema is
not compatible with the table's one")
- }
+ if (!shouldValidateSchemasCompatibility) {
+ // if no validation is enabled, check for col drop
+ // if col drop is allowed, go ahead. if not, check for
projection, so that we do not allow dropping cols
+ if (allowAutoEvolutionColumnDrop ||
canProject(latestTableSchema, canonicalizedSourceSchema)) {
+ canonicalizedSourceSchema
+ } else {
+ log.error(
+ s"""Incoming batch schema is not compatible with the table's
one.
+ |Incoming schema ${sourceSchema.toString(true)}
+ |Incoming schema (canonicalized)
${canonicalizedSourceSchema.toString(true)}
+ |Table's schema ${latestTableSchema.toString(true)}
+ |""".stripMargin)
+ throw new SchemaCompatibilityException("Incoming batch schema
is not compatible with the table's one")
+ }
+ } else if (isSchemaCompatible(latestTableSchema,
canonicalizedSourceSchema, allowAutoEvolutionColumnDrop)) {
+ canonicalizedSourceSchema
+ } else {
+ log.error(
+ s"""Incoming batch schema is not compatible with the table's
one.
+ |Incoming schema ${sourceSchema.toString(true)}
+ |Incoming schema (canonicalized)
${canonicalizedSourceSchema.toString(true)}
+ |Table's schema ${latestTableSchema.toString(true)}
+ |""".stripMargin)
+ throw new SchemaCompatibilityException("Incoming batch schema
is not compatible with the table's one")
+ }
+ }
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index ed3f2591253..54f2534b4a0 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi.command
import org.apache.avro.Schema
import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema
import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.HoodieSparkSqlWriter.CANONICALIZE_NULLABLE
+import org.apache.hudi.HoodieSparkSqlWriter.{CANONICALIZE_NULLABLE,
SQL_MERGE_INTO_WRITES}
import org.apache.hudi.common.model.HoodieAvroRecordMerger
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
@@ -532,7 +532,8 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// target table, ie partially updating)
AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false",
RECONCILE_SCHEMA.key -> "false",
- CANONICALIZE_NULLABLE.key -> "false"
+ CANONICALIZE_NULLABLE.key -> "false",
+ SQL_MERGE_INTO_WRITES.key -> "true"
)
combineOptions(hoodieCatalogTable, tableConfig,
sparkSession.sqlContext.conf,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
index cd6396f08fb..2b1060e90f0 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -19,6 +19,7 @@
package org.apache.hudi
import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.SchemaCompatibilityException
import org.apache.hudi.testutils.HoodieClientTestBase
@@ -26,7 +27,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import scala.language.postfixOps
@@ -83,7 +84,8 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
.save(saveDir)
}
- def upsertData(df: DataFrame, saveDir: String, isCow: Boolean = true,
shouldAllowDroppedColumns: Boolean = false): Unit = {
+ def upsertData(df: DataFrame, saveDir: String, isCow: Boolean = true,
shouldAllowDroppedColumns: Boolean = false,
+ enableSchemaValidation: Boolean =
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue().toBoolean): Unit =
{
var opts = if (isCow) {
commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key ->
"COPY_ON_WRITE")
} else {
@@ -206,12 +208,18 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
@ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def testDeleteColumn(isCow: Boolean): Unit = {
+ @CsvSource(value = Array(
+ "COPY_ON_WRITE,true",
+ "COPY_ON_WRITE,false",
+ "MERGE_ON_READ,true",
+ "MERGE_ON_READ,false"
+ ))
+ def testDeleteColumn(tableType: String, schemaValidationEnabled : Boolean):
Unit = {
// test to delete a column
val tempRecordPath = basePath + "/record_tbl/"
val _spark = spark
import _spark.implicits._
+ val isCow = tableType.equals(HoodieTableType.COPY_ON_WRITE.name())
val df1 = Seq((1, 100, "aaa")).toDF("id", "userid", "name")
val df2 = Seq((2, "bbb")).toDF("id", "name")
@@ -231,10 +239,10 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
// upsert
assertThrows(classOf[SchemaCompatibilityException]) {
- upsertData(upsertDf, tempRecordPath, isCow)
+ upsertData(upsertDf, tempRecordPath, isCow, enableSchemaValidation =
schemaValidationEnabled)
}
- upsertData(upsertDf, tempRecordPath, isCow, shouldAllowDroppedColumns =
true)
+ upsertData(upsertDf, tempRecordPath, isCow, shouldAllowDroppedColumns =
true, enableSchemaValidation = schemaValidationEnabled)
// read out the table
val readDf = spark.read.format("hudi").load(tempRecordPath)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 0981f74a102..99fd72d5a28 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -17,10 +17,11 @@
package org.apache.spark.sql.hudi
-import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers,
HoodieSparkUtils}
+import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers,
HoodieSparkUtils, ScalaAssertionSupport}
import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.exception.SchemaCompatibilityException
-class TestMergeIntoTable extends HoodieSparkSqlTestBase {
+class TestMergeIntoTable extends HoodieSparkSqlTestBase with
ScalaAssertionSupport {
test("Test MergeInto Basic") {
withRecordType()(withTempDir { tmp =>