xushiyan commented on code in PR #7857:
URL: https://github.com/apache/hudi/pull/7857#discussion_r1096631582
##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -72,6 +69,18 @@ public static boolean isSchemaCompatible(Schema prevSchema,
Schema newSchema, bo
return result.getType() ==
AvroSchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
}
+ /**
+ * Check that each field in the prevSchema can be populated in the newSchema
+ * @param prevSchema prev schema.
+ * @param newSchema new schema
+ * @return true if prev schema is a projection of new schema.
+ */
+ public static boolean checkProjection(Schema prevSchema, Schema newSchema) {
Review Comment:
better name as canProject() to imply boolean return
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -455,7 +455,22 @@ object HoodieSparkSqlWriter {
// w/ the table's one and allow schemas to diverge. This is
required in cases where
// partial updates will be performed (for ex, `MERGE INTO`
Spark SQL statement) and as such
// only incoming dataset's projection has to match the table's
schema, and not the whole one
- if (!shouldValidateSchemasCompatibility ||
isSchemaCompatible(latestTableSchema, canonicalizedSourceSchema,
allowAutoEvolutionColumnDrop)) {
+
+ if (!shouldValidateSchemasCompatibility) {
+ // if no validation is enabled, check for col drop
+ // if col drop is allowed, go ahead. if not, check for projection,
so that we do not allow dropping cols
+ if (allowAutoEvolutionColumnDrop ||
checkProjection(latestTableSchema, canonicalizedSourceSchema)) {
+ canonicalizedSourceSchema
Review Comment:
so just to confirm the logic: if users explicitly allow column drop or if
the writer schema can be projected to the table schema (no column drop), we
honor the writer schema
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala:
##########
@@ -206,12 +208,18 @@ class TestAvroSchemaResolutionSupport extends
HoodieClientTestBase with ScalaAss
}
@ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def testDeleteColumn(isCow: Boolean): Unit = {
+ @CsvSource(value = Array(
+ "COPY_ON_WRITE,true",
+ "COPY_ON_WRITE,false",
+ "MERGE_ON_READ,true",
+ "MERGE_ON_READ,false"
+ ))
+ def testDeleteColumn(tableType: String, schemaValidationEnable : Boolean):
Unit = {
Review Comment:
/nit schemaValidationEnabled
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]