This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b3fa99685ed923045bf966b0fc5b273a881e157a
Author: Sagar Sumit <[email protected]>
AuthorDate: Wed Feb 1 18:13:25 2023 +0530

    [HUDI-5646] Guard dropping columns by a config, do not allow by default 
(#7787)
    
    * [HUDI-5646] Guard dropping columns by a config, do not allow by default
    
    * Replaced superfluous `isSchemaCompatible` override by explicitly 
specifying whether column drop should be allowed;
    
    * Revisited `HoodieSparkSqlWriter` to avoid (unnecessary) schema handling 
for delete operations
    
    * Remove meta-fields from latest table schema during analysis
    
    * Disable schema validation when partition columns are dropped
    
    ---------
    
    Co-authored-by: Alexey Kudinkin <[email protected]>
    Co-authored-by: sivabalan <[email protected]>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  18 +++
 .../java/org/apache/hudi/table/HoodieTable.java    |  12 +-
 .../hudi/client/TestTableSchemaEvolution.java      | 125 +++++++++++++--------
 .../hudi/testutils/HoodieClientTestUtils.java      |  19 +++-
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java |  23 +++-
 .../hudi/common/table/TableSchemaResolver.java     |  36 ------
 .../org/apache/hudi/common/util/ParquetUtils.java  |  34 +++---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  68 +++++------
 .../AlterHoodieTableChangeColumnCommand.scala      |   2 +-
 .../hudi/TestAvroSchemaResolutionSupport.scala     |  19 +++-
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala |  49 +++++---
 .../hudi/functional/TestBasicSchemaEvolution.scala |  21 +++-
 .../hudi/functional/TestCOWDataSourceStorage.scala |  10 +-
 .../hudi/functional/TestColumnStatsIndex.scala     |   7 +-
 14 files changed, 268 insertions(+), 175 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index e187fff4483..e6525a2b1dc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -227,6 +227,15 @@ public class HoodieWriteConfig extends HoodieConfig {
       .defaultValue("true")
       .withDocumentation("Validate the schema used for the write against the 
latest schema, for backwards compatibility.");
 
+  public static final ConfigProperty<String> 
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP = ConfigProperty
+      .key("hoodie.datasource.write.schema.allow.auto.evolution.column.drop")
+      .defaultValue("false")
+      .sinceVersion("0.13.0")
+      .withDocumentation("Controls whether table's schema is allowed to 
automatically evolve when "
+          + "incoming batch's schema can have any of the columns dropped. By 
default, Hudi will not "
+          + "allow this kind of (auto) schema evolution. Set this config to 
true to allow table's "
+          + "schema to be updated automatically when columns are dropped from 
the new incoming batch.");
+
   public static final ConfigProperty<String> INSERT_PARALLELISM_VALUE = 
ConfigProperty
       .key("hoodie.insert.shuffle.parallelism")
       .defaultValue("0")
@@ -1086,6 +1095,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(AVRO_SCHEMA_VALIDATE_ENABLE);
   }
 
+  public boolean shouldAllowAutoEvolutionColumnDrop() {
+    return getBooleanOrDefault(SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP);
+  }
+
   public String getTableName() {
     return getString(TBL_NAME);
   }
@@ -2451,6 +2464,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withAllowAutoEvolutionColumnDrop(boolean 
shouldAllowDroppedColumns) {
+      writeConfig.setValue(SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, 
String.valueOf(shouldAllowDroppedColumns));
+      return this;
+    }
+
     public Builder forTable(String tableName) {
       writeConfig.setValue(TBL_NAME, tableName);
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index af8d8d23261..591ebc430dc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -800,7 +800,7 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    */
   private void validateSchema() throws HoodieUpsertException, 
HoodieInsertException {
 
-    if (!config.shouldValidateAvroSchema() || 
getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
+    if (!shouldValidateAvroSchema() || 
getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
       // Check not required
       return;
     }
@@ -812,7 +812,7 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
       TableSchemaResolver schemaResolver = new 
TableSchemaResolver(getMetaClient());
       writerSchema = 
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
       tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchemaWithoutMetadataFields());
-      isValid = isSchemaCompatible(tableSchema, writerSchema);
+      isValid = isSchemaCompatible(tableSchema, writerSchema, 
config.shouldAllowAutoEvolutionColumnDrop());
     } catch (Exception e) {
       throw new HoodieException("Failed to read schema/check compatibility for 
base path " + metaClient.getBasePath(), e);
     }
@@ -1010,4 +1010,12 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
   public Runnable getPreExecuteRunnable() {
     return Functions.noop();
   }
+
+  private boolean shouldValidateAvroSchema() {
+    // TODO(HUDI-4772) re-enable validations in case partition columns
+    //                 being dropped from the data-file after fixing the write 
schema
+    Boolean shouldDropPartitionColumns = 
metaClient.getTableConfig().shouldDropPartitionColumns();
+
+    return config.shouldValidateAvroSchema() && !shouldDropPartitionColumns;
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index f778c7cceac..686563ebfdd 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.client;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -34,10 +32,17 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.List;
@@ -79,36 +84,35 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
 
   @Test
   public void testSchemaCompatibilityBasic() {
-    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA),
+    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA, 
false),
         "Same schema is compatible");
 
     String reorderedSchema = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + 
TIP_NESTED_SCHEMA + FARE_NESTED_SCHEMA
         + MAP_TYPE_SCHEMA + TRIP_SCHEMA_SUFFIX;
-    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, reorderedSchema),
+    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, reorderedSchema, false),
         "Reordered fields are compatible");
-    assertTrue(isSchemaCompatible(reorderedSchema, TRIP_EXAMPLE_SCHEMA),
+    assertTrue(isSchemaCompatible(reorderedSchema, TRIP_EXAMPLE_SCHEMA, false),
         "Reordered fields are compatible");
 
     String renamedSchema = TRIP_EXAMPLE_SCHEMA.replace("tip_history", 
"tip_future");
 
-    // NOTE: That even though renames could be carried over as "column drop" 
and "column add"
-    //       both of which are legitimate operations, no data carry-over will 
occur (exactly b/c
-    //       it's an old column being dropped, and the new one being added)
-    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedSchema),
+    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedSchema, false),
+        "Renaming fields is essentially: dropping old field, created a new 
one");
+    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedSchema, true),
         "Renaming fields is essentially: dropping old field, created a new 
one");
 
     String renamedRecordSchema = TRIP_EXAMPLE_SCHEMA.replace("triprec", 
"triprec_renamed");
-    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedRecordSchema),
+    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedRecordSchema, 
false),
         "Renamed record name is not compatible");
 
     String swappedFieldSchema = TRIP_SCHEMA_PREFIX + 
MAP_TYPE_SCHEMA.replace("city_to_state", "fare")
         + FARE_NESTED_SCHEMA.replace("fare", "city_to_state") + 
TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
-    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, swappedFieldSchema),
+    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, swappedFieldSchema, 
false),
         "Swapped fields are not compatible");
 
     String typeChangeSchemaDisallowed = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + 
FARE_NESTED_SCHEMA
         + TIP_NESTED_SCHEMA.replace("string", "boolean") + TRIP_SCHEMA_SUFFIX;
-    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
typeChangeSchemaDisallowed),
+    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
typeChangeSchemaDisallowed, false),
         "Incompatible field type change is not allowed");
 
     // Array of allowed schema field type transitions
@@ -119,10 +123,10 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
     for (String[] fieldChange : allowedFieldChanges) {
       String fromSchema = TRIP_SCHEMA_PREFIX + 
EXTRA_FIELD_SCHEMA.replace("string", fieldChange[0]) + TRIP_SCHEMA_SUFFIX;
       String toSchema = TRIP_SCHEMA_PREFIX + 
EXTRA_FIELD_SCHEMA.replace("string", fieldChange[1]) + TRIP_SCHEMA_SUFFIX;
-      assertTrue(isSchemaCompatible(fromSchema, toSchema),
+      assertTrue(isSchemaCompatible(fromSchema, toSchema, false),
           "Compatible field type change is not allowed");
       if (!fieldChange[0].equals("byte") && fieldChange[1].equals("byte")) {
-        assertFalse(isSchemaCompatible(toSchema, fromSchema),
+        assertFalse(isSchemaCompatible(toSchema, fromSchema, false),
             "Incompatible field type change is allowed");
       }
     }
@@ -130,32 +134,31 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
     // Names and aliases should match
     String fromSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA + 
TRIP_SCHEMA_SUFFIX;
     String toSchema = TRIP_SCHEMA_PREFIX.replace("triprec", "new_triprec") + 
EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX;
-    assertFalse(isSchemaCompatible(fromSchema, toSchema), "Field names should 
match");
-    assertFalse(isSchemaCompatible(toSchema, fromSchema), "Field names should 
match");
+    assertFalse(isSchemaCompatible(fromSchema, toSchema, false), "Field names 
should match");
+    assertFalse(isSchemaCompatible(toSchema, fromSchema, false), "Field names 
should match");
 
 
-    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED),
+    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED, false),
         "Added field with default is compatible (Evolved Schema)");
 
     String multipleAddedFieldSchema = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + 
MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
         + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + 
EXTRA_FIELD_SCHEMA.replace("new_field", "new_new_field")
         + TRIP_SCHEMA_SUFFIX;
-    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
multipleAddedFieldSchema),
+    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
multipleAddedFieldSchema, false),
         "Multiple added fields with defaults are compatible");
 
-    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
-        TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
-            + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + 
EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA + TRIP_SCHEMA_SUFFIX),
+    assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_SCHEMA_PREFIX + 
EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
+            + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + 
EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA + TRIP_SCHEMA_SUFFIX, false),
         "Added field without default and not nullable is not compatible 
(Evolved Schema)");
 
-    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
-        TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
-            + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX + 
EXTRA_FIELD_NULLABLE_SCHEMA),
+    assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_SCHEMA_PREFIX + 
EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
+            + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX + 
EXTRA_FIELD_NULLABLE_SCHEMA, false),
         "Added nullable field is compatible (Evolved Schema)");
   }
 
-  @Test
-  public void testMORTable() throws Exception {
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testMORTable(boolean shouldAllowDroppedColumns) throws Exception 
{
     tableType = HoodieTableType.MERGE_ON_READ;
 
     // Create the table
@@ -165,7 +168,7 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
       .setTimelineLayoutVersion(VERSION_1)
       .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
 
-    HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA);
+    HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA, 
shouldAllowDroppedColumns);
     SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
 
     // Initial inserts with TRIP_EXAMPLE_SCHEMA
@@ -194,20 +197,26 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
     checkReadRecords("000", numRecords);
 
     // Insert with evolved schema (column dropped) is allowed
-    HoodieWriteConfig hoodieDevolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
+    HoodieWriteConfig hoodieDevolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED, 
shouldAllowDroppedColumns);
     client = getHoodieWriteClient(hoodieDevolvedWriteConfig);
     final List<HoodieRecord> failedRecords = generateInsertsWithSchema("005", 
numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
     // We cannot use insertBatch directly here because we want to insert 
records
     // with a evolved schema and insertBatch inserts records using the 
TRIP_EXAMPLE_SCHEMA.
-    writeBatch(client, "005", "004", Option.empty(), "003", numRecords,
-        (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, 
false, numRecords, 2 * numRecords, 5, false);
+    try {
+      writeBatch(client, "005", "004", Option.empty(), "003", numRecords,
+          (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, 
false, numRecords, 2 * numRecords, 5, false);
+      assertTrue(shouldAllowDroppedColumns);
+    } catch (HoodieInsertException e) {
+      assertFalse(shouldAllowDroppedColumns);
+      return;
+    }
 
-    // Update with evolved schema (column dropped) is allowed
+    // Update with evolved schema (column dropped) might be allowed depending 
on config set.
     updateBatch(hoodieDevolvedWriteConfig, client, "006", "005", 
Option.empty(),
                 initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, 
false, false, numUpdateRecords, 2 * numRecords, 0);
 
     // Insert with an evolved scheme is allowed
-    HoodieWriteConfig hoodieEvolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED);
+    HoodieWriteConfig hoodieEvolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED, 
shouldAllowDroppedColumns);
     client = getHoodieWriteClient(hoodieEvolvedWriteConfig);
 
     // We cannot use insertBatch directly here because we want to insert 
records
@@ -230,19 +239,28 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
 
     // Now try updating w/ the original schema (should succeed)
     client = getHoodieWriteClient(hoodieWriteConfig);
-    updateBatch(hoodieWriteConfig, client, "009", "008", Option.empty(),
-                initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, 
false, false, numUpdateRecords, 4 * numRecords, 9);
+    try {
+      updateBatch(hoodieWriteConfig, client, "009", "008", Option.empty(),
+          initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, 
false, false, numUpdateRecords, 4 * numRecords, 9);
+      assertTrue(shouldAllowDroppedColumns);
+    } catch (HoodieUpsertException e) {
+      assertFalse(shouldAllowDroppedColumns);
+    }
   }
 
-  @Test
-  public void testCopyOnWriteTable() throws Exception {
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testCopyOnWriteTable(boolean shouldAllowDroppedColumns) throws 
Exception {
     // Create the table
     HoodieTableMetaClient.withPropertyBuilder()
       .fromMetaClient(metaClient)
       .setTimelineLayoutVersion(VERSION_1)
       .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
 
-    HoodieWriteConfig hoodieWriteConfig = 
getWriteConfigBuilder(TRIP_EXAMPLE_SCHEMA).withRollbackUsingMarkers(false).build();
+    HoodieWriteConfig hoodieWriteConfig = 
getWriteConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+        .withRollbackUsingMarkers(false)
+        .withAllowAutoEvolutionColumnDrop(shouldAllowDroppedColumns)
+        .build();
     SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
 
     // Initial inserts with TRIP_EXAMPLE_SCHEMA
@@ -266,11 +284,17 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
     checkReadRecords("000", numRecords);
 
     // Inserting records w/ new evolved schema (w/ tip column dropped)
-    HoodieWriteConfig hoodieDevolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
+    HoodieWriteConfig hoodieDevolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED, 
shouldAllowDroppedColumns);
     client = getHoodieWriteClient(hoodieDevolvedWriteConfig);
     final List<HoodieRecord> failedRecords = generateInsertsWithSchema("004", 
numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
-    writeBatch(client, "004", "003", Option.empty(), "003", numRecords,
-        (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, 
true, numRecords, numRecords * 2, 1, false);
+    try {
+      writeBatch(client, "004", "003", Option.empty(), "003", numRecords,
+          (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, 
true, numRecords, numRecords * 2, 1, false);
+      assertTrue(shouldAllowDroppedColumns);
+    } catch (HoodieInsertException e) {
+      assertFalse(shouldAllowDroppedColumns);
+      return;
+    }
 
     // Updating records w/ new evolved schema
     updateBatch(hoodieDevolvedWriteConfig, client, "005", "004", 
Option.empty(),
@@ -278,7 +302,7 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
                 numUpdateRecords, 2 * numRecords, 5);
 
     // Inserting with evolved schema is allowed
-    HoodieWriteConfig hoodieEvolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED);
+    HoodieWriteConfig hoodieEvolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED, 
shouldAllowDroppedColumns);
     client = getHoodieWriteClient(hoodieEvolvedWriteConfig);
     final List<HoodieRecord> evolvedRecords = generateInsertsWithSchema("006", 
numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED);
     // We cannot use insertBatch directly here because we want to insert 
records
@@ -299,9 +323,14 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
 
     // Now try updating w/ the original schema (should succeed)
     client = getHoodieWriteClient(hoodieWriteConfig);
-    updateBatch(hoodieWriteConfig, client, "008", "007", Option.empty(),
-                initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, 
false, true,
-                numUpdateRecords, 3 * numRecords, 8);
+    try {
+      updateBatch(hoodieWriteConfig, client, "008", "007", Option.empty(),
+          initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, 
false, true,
+          numUpdateRecords, 3 * numRecords, 8);
+      assertTrue(shouldAllowDroppedColumns);
+    } catch (HoodieUpsertException e) {
+      assertFalse(shouldAllowDroppedColumns);
+    }
   }
 
   private void checkReadRecords(String instantTime, int numExpectedRecords) 
throws IOException {
@@ -362,8 +391,8 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
     }).collect(Collectors.toList());
   }
 
-  private HoodieWriteConfig getWriteConfig(String schema) {
-    return getWriteConfigBuilder(schema).build();
+  private HoodieWriteConfig getWriteConfig(String schema, boolean 
shouldAllowDroppedColumns) {
+    return 
getWriteConfigBuilder(schema).withAllowAutoEvolutionColumnDrop(shouldAllowDroppedColumns).build();
   }
 
   private HoodieWriteConfig.Builder getWriteConfigBuilder(String schema) {
@@ -373,8 +402,8 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
         .withAvroSchemaValidate(true);
   }
 
-  private static boolean isSchemaCompatible(String oldSchema, String 
newSchema) {
-    return AvroSchemaUtils.isSchemaCompatible(new 
Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema));
+  private static boolean isSchemaCompatible(String oldSchema, String 
newSchema, boolean shouldAllowDroppedColumns) {
+    return AvroSchemaUtils.isSchemaCompatible(new 
Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema), 
shouldAllowDroppedColumns);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index 609fdb0bd5c..09447965b2c 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -37,6 +37,7 @@ import 
org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.storage.HoodieHFileUtils;
@@ -96,11 +97,17 @@ public class HoodieClientTestUtils {
         .setMaster("local[4]")
         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
         .set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar")
-        .set("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
         .set("spark.sql.shuffle.partitions", "4")
         .set("spark.default.parallelism", "4");
 
-    if (HoodieSparkUtils.gteqSpark3_2()) {
+    // NOTE: This utility is used in modules where this class might not be 
present, therefore
+    //       to avoid littering output w/ [[ClassNotFoundException]]s we will 
skip adding it
+    //       in case this utility is used in the module not providing it
+    if (canLoadClass("org.apache.spark.sql.hudi.HoodieSparkSessionExtension")) 
{
+      sparkConf.set("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
+    }
+
+    if (canLoadClass("org.apache.spark.sql.hudi.catalog.HoodieCatalog") && 
HoodieSparkUtils.gteqSpark3_2()) {
       sparkConf.set("spark.sql.catalog.spark_catalog",
           "org.apache.spark.sql.hudi.catalog.HoodieCatalog");
     }
@@ -326,4 +333,12 @@ public class HoodieClientTestUtils {
       throw new HoodieException("Failed to read schema from commit metadata", 
e);
     }
   }
+
+  private static boolean canLoadClass(String className) {
+    try {
+      return ReflectionUtils.getClass(className) != null;
+    } catch (Exception e) {
+      return false;
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 395fc100bf1..545acdcf309 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.avro;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
 
 import java.util.List;
 import java.util.Objects;
@@ -36,10 +37,10 @@ public class AvroSchemaUtils {
   private AvroSchemaUtils() {}
 
   /**
-   * See {@link #isSchemaCompatible(Schema, Schema, boolean)} doc for more 
details
+   * See {@link #isSchemaCompatible(Schema, Schema, boolean, boolean)} doc for 
more details
    */
-  public static boolean isSchemaCompatible(Schema prevSchema, Schema 
newSchema) {
-    return isSchemaCompatible(prevSchema, newSchema, true);
+  public static boolean isSchemaCompatible(Schema prevSchema, Schema 
newSchema, boolean allowProjection) {
+    return isSchemaCompatible(prevSchema, newSchema, true, allowProjection);
   }
 
   /**
@@ -50,10 +51,22 @@ public class AvroSchemaUtils {
    * @param newSchema new instance of the schema
    * @param checkNaming controls whether schemas fully-qualified names should 
be checked
    */
-  public static boolean isSchemaCompatible(Schema prevSchema, Schema 
newSchema, boolean checkNaming) {
+  public static boolean isSchemaCompatible(Schema prevSchema, Schema 
newSchema, boolean checkNaming, boolean allowProjection) {
     // NOTE: We're establishing compatibility of the {@code prevSchema} and 
{@code newSchema}
     //       as following: {@code newSchema} is considered compatible to 
{@code prevSchema},
     //       iff data written using {@code prevSchema} could be read by {@code 
newSchema}
+
+    // In case schema projection is not allowed, new schema has to have all 
the same fields as the
+    // old schema
+    if (!allowProjection) {
+      // Check that each field in the oldSchema can be populated in the 
newSchema
+      if (prevSchema.getFields().stream()
+          .map(oldSchemaField -> 
SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField))
+          .anyMatch(Objects::isNull)) {
+        return false;
+      }
+    }
+
     AvroSchemaCompatibility.SchemaPairCompatibility result =
         AvroSchemaCompatibility.checkReaderWriterCompatibility(newSchema, 
prevSchema, checkNaming);
     return result.getType() == 
AvroSchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
@@ -88,7 +101,7 @@ public class AvroSchemaUtils {
   private static boolean isAtomicSchemasCompatible(Schema oneAtomicType, 
Schema anotherAtomicType) {
     // NOTE: Checking for compatibility of atomic types, we should ignore their
     //       corresponding fully-qualified names (as irrelevant)
-    return isSchemaCompatible(oneAtomicType, anotherAtomicType, false);
+    return isSchemaCompatible(oneAtomicType, anotherAtomicType, false, true);
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 4eddd6df031..03ed542bd60 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.table;
 
-import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -31,7 +30,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.Functions.Function1;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -315,40 +313,6 @@ public class TableSchemaResolver {
     return Option.empty();
   }
 
-  /**
-   * Get latest schema either from incoming schema or table schema.
-   * @param writeSchema incoming batch's write schema.
-   * @param convertTableSchemaToAddNamespace {@code true} if table schema 
needs to be converted. {@code false} otherwise.
-   * @param converterFn converter function to be called over table schema (to 
add namespace may be). Each caller can decide if any conversion is required.
-   * @return the latest schema.
-   *
-   * @deprecated will be removed (HUDI-4472)
-   */
-  @Deprecated
-  public Schema getLatestSchema(Schema writeSchema, boolean 
convertTableSchemaToAddNamespace,
-      Function1<Schema, Schema> converterFn) {
-    Schema latestSchema = writeSchema;
-    try {
-      if (metaClient.isTimelineNonEmpty()) {
-        Schema tableSchema = getTableAvroSchemaWithoutMetadataFields();
-        if (convertTableSchemaToAddNamespace && converterFn != null) {
-          tableSchema = converterFn.apply(tableSchema);
-        }
-        if (writeSchema.getFields().size() < tableSchema.getFields().size() && 
AvroSchemaUtils.isSchemaCompatible(writeSchema, tableSchema)) {
-          // if incoming schema is a subset (old schema) compared to table 
schema. For eg, one of the
-          // ingestion pipeline is still producing events in old schema
-          latestSchema = tableSchema;
-          LOG.debug("Using latest table schema to rewrite incoming records " + 
tableSchema.toString());
-        }
-      }
-    } catch (IllegalArgumentException | InvalidTableException e) {
-      LOG.warn("Could not find any commits, falling back to using incoming 
batch's write schema");
-    } catch (Exception e) {
-      LOG.warn("Unknown exception thrown " + e.getMessage() + ", Falling back 
to using incoming batch's write schema");
-    }
-    return latestSchema;
-  }
-
   private MessageType readSchemaFromParquetBaseFile(Path parquetFilePath) 
throws IOException {
     LOG.info("Reading schema from " + parquetFilePath);
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index bc736090d5e..d0ef867a2d4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -37,6 +37,7 @@ import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.avro.AvroReadSupport;
 import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -314,20 +315,25 @@ public class ParquetUtils extends BaseFileUtils {
           .flatMap(blockMetaData ->
               blockMetaData.getColumns().stream()
                 .filter(f -> cols.contains(f.getPath().toDotString()))
-                .map(columnChunkMetaData ->
-                    HoodieColumnRangeMetadata.<Comparable>create(
-                        parquetFilePath.getName(),
-                        columnChunkMetaData.getPath().toDotString(),
-                        convertToNativeJavaType(
-                            columnChunkMetaData.getPrimitiveType(),
-                            
columnChunkMetaData.getStatistics().genericGetMin()),
-                        convertToNativeJavaType(
-                            columnChunkMetaData.getPrimitiveType(),
-                            
columnChunkMetaData.getStatistics().genericGetMax()),
-                        columnChunkMetaData.getStatistics().getNumNulls(),
-                        columnChunkMetaData.getValueCount(),
-                        columnChunkMetaData.getTotalSize(),
-                        columnChunkMetaData.getTotalUncompressedSize()))
+                .map(columnChunkMetaData -> {
+                  Statistics stats = columnChunkMetaData.getStatistics();
+                  return HoodieColumnRangeMetadata.<Comparable>create(
+                      parquetFilePath.getName(),
+                      columnChunkMetaData.getPath().toDotString(),
+                      convertToNativeJavaType(
+                          columnChunkMetaData.getPrimitiveType(),
+                          stats.genericGetMin()),
+                      convertToNativeJavaType(
+                          columnChunkMetaData.getPrimitiveType(),
+                          stats.genericGetMax()),
+                      // NOTE: In case when column contains only nulls Parquet 
won't be creating
+                      //       stats for it instead returning stubbed (empty) 
object. In that case
+                      //       we have to equate number of nulls to the value 
count ourselves
+                      stats.isEmpty() ? columnChunkMetaData.getValueCount() : 
stats.getNumNulls(),
+                      columnChunkMetaData.getValueCount(),
+                      columnChunkMetaData.getTotalSize(),
+                      columnChunkMetaData.getTotalUncompressedSize());
+                })
           )
           .collect(groupingByCollector);
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 0092019dad5..764f9474ee0 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -26,7 +26,8 @@ import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
 import org.apache.hudi.HoodieWriterUtils._
 import org.apache.hudi.avro.AvroSchemaUtils.{isCompatibleProjectionOf, 
isSchemaCompatible}
-import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.avro.HoodieAvroUtils.removeMetadataFields
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
 import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, 
HoodieConfig, HoodieMetadataConfig, TypedProperties}
@@ -230,31 +231,6 @@ object HoodieSparkSqlWriter {
         }
       }
 
-      // NOTE: Target writer's schema is deduced based on
-      //         - Source's schema
-      //         - Existing table's schema (including its Hudi's 
[[InternalSchema]] representation)
-      val writerSchema = deduceWriterSchema(sourceSchema, 
latestTableSchemaOpt, internalSchemaOpt, parameters)
-
-      validateSchemaForHoodieIsDeleted(writerSchema)
-
-      // NOTE: PLEASE READ CAREFULLY BEFORE CHANGING THIS
-      //       We have to register w/ Kryo all of the Avro schemas that might 
potentially be used to decode
-      //       records into Avro format. Otherwise, Kryo wouldn't be able to 
apply an optimization allowing
-      //       it to avoid the need to ser/de the whole schema along _every_ 
Avro record
-      val targetAvroSchemas = sourceSchema +: writerSchema +: 
latestTableSchemaOpt.toSeq
-      registerAvroSchemasWithKryo(sparkContext, targetAvroSchemas: _*)
-
-      log.info(s"Registered Avro schemas: 
${targetAvroSchemas.map(_.toString(true)).mkString("\n")}")
-
-      // Short-circuit if bulk_insert via row is enabled.
-      // scalastyle:off
-      if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == 
WriteOperationType.BULK_INSERT) {
-        val (success, commitTime: common.util.Option[String]) = 
bulkInsertAsRow(sqlContext, hoodieConfig, df, tblName,
-          basePath, path, instantTime, writerSchema, 
tableConfig.isTablePartitioned)
-        return (success, commitTime, common.util.Option.empty(), 
common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
-      }
-      // scalastyle:on
-
       val (writeResult, writeClient: SparkRDDWriteClient[_]) =
         operation match {
           case WriteOperationType.DELETE =>
@@ -312,8 +288,24 @@ object HoodieSparkSqlWriter {
             client.startCommitWithTime(instantTime, commitActionType)
             val writeStatuses = 
DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, 
instantTime)
             (writeStatuses, client)
+
+          // Here all other (than DELETE, DELETE_PARTITION) write operations 
are handled
           case _ =>
-            // Here all other (than DELETE, DELETE_PARTITION) write operations 
are handled
+            // NOTE: Target writer's schema is deduced based on
+            //         - Source's schema
+            //         - Existing table's schema (including its Hudi's 
[[InternalSchema]] representation)
+            val writerSchema = deduceWriterSchema(sourceSchema, 
latestTableSchemaOpt, internalSchemaOpt, parameters)
+
+            validateSchemaForHoodieIsDeleted(writerSchema)
+
+            // Short-circuit if bulk_insert via row is enabled.
+            // scalastyle:off
+            if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == 
WriteOperationType.BULK_INSERT) {
+              val (success, commitTime: common.util.Option[String]) = 
bulkInsertAsRow(sqlContext, hoodieConfig, df, tblName,
+                basePath, path, instantTime, writerSchema, 
tableConfig.isTablePartitioned)
+              return (success, commitTime, common.util.Option.empty(), 
common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
+            }
+            // scalastyle:on
 
             // Check whether partition columns should be persisted w/in the 
data-files, or should
             // be instead omitted from them and simply encoded into the 
partition path (which is Spark's
@@ -404,7 +396,11 @@ object HoodieSparkSqlWriter {
       // writer's schema. No additional handling is required
       case None => sourceSchema
       // Otherwise, we need to make sure we reconcile incoming and latest 
table schemas
-      case Some(latestTableSchema) =>
+      case Some(latestTableSchemaWithMetaFields) =>
+        // NOTE: Meta-fields will be unconditionally injected by Hudi writing 
handles, for the sake of
+        //       deducing proper writer schema we're stripping them to make 
sure we can perform proper
+        //       analysis
+        val latestTableSchema = 
removeMetadataFields(latestTableSchemaWithMetaFields)
         // Before validating whether schemas are compatible, we need to 
"canonicalize" source's schema
         // relative to the table's one, by doing a (minor) reconciliation of 
the nullability constraints:
         // for ex, if in incoming schema column A is designated as non-null, 
but it's designated as nullable
@@ -417,6 +413,9 @@ object HoodieSparkSqlWriter {
           sourceSchema
         }
 
+        val allowAutoEvolutionColumnDrop = 
opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key,
+          
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean
+
         if (shouldReconcileSchema) {
           internalSchemaOpt match {
             case Some(internalSchema) =>
@@ -428,7 +427,8 @@ object HoodieSparkSqlWriter {
             case None =>
               // In case schema reconciliation is enabled we will employ 
(legacy) reconciliation
               // strategy to produce target writer's schema (see definition 
below)
-              val (reconciledSchema, isCompatible) = 
reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema)
+              val (reconciledSchema, isCompatible) =
+                reconcileSchemasLegacy(latestTableSchema, 
canonicalizedSourceSchema)
 
               // NOTE: In some cases we need to relax constraint of incoming 
dataset's schema to be compatible
               //       w/ the table's one and allow schemas to diverge. This 
is required in cases where
@@ -455,7 +455,7 @@ object HoodieSparkSqlWriter {
           //       w/ the table's one and allow schemas to diverge. This is 
required in cases where
           //       partial updates will be performed (for ex, `MERGE INTO` 
Spark SQL statement) and as such
           //       only incoming dataset's projection has to match the table's 
schema, and not the whole one
-          if (!shouldValidateSchemasCompatibility || 
AvroSchemaUtils.isSchemaCompatible(latestTableSchema, 
canonicalizedSourceSchema)) {
+          if (!shouldValidateSchemasCompatibility || 
isSchemaCompatible(latestTableSchema, canonicalizedSourceSchema, 
allowAutoEvolutionColumnDrop)) {
             canonicalizedSourceSchema
           } else {
             log.error(
@@ -550,14 +550,18 @@ object HoodieSparkSqlWriter {
     // the other one (schema A contains schema B, if schema B is a projection 
of A). This enables us,
     // to always "extend" the schema during schema evolution and hence never 
lose the data (when, for ex
     // existing column is being dropped in a new batch)
+    //
+    // NOTE: By default Hudi doesn't allow automatic schema evolution to drop 
the columns from the target
+    //       table. However, when schema reconciliation is turned on, we would 
allow columns to be dropped
+    //       in the incoming batch (as these would be reconciled in anyway)
     if (isCompatibleProjectionOf(tableSchema, newSchema)) {
       // Picking table schema as a writer schema we need to validate that we'd 
be able to
       // rewrite incoming batch's data (written in new schema) into it
-      (tableSchema, isSchemaCompatible(newSchema, tableSchema))
+      (tableSchema, isSchemaCompatible(newSchema, tableSchema, true))
     } else {
       // Picking new schema as a writer schema we need to validate that we'd 
be able to
       // rewrite table's data into it
-      (newSchema, isSchemaCompatible(tableSchema, newSchema))
+      (newSchema, isSchemaCompatible(tableSchema, newSchema, true))
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
index dd2aae06bce..cb5a3f6fa75 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala
@@ -97,7 +97,7 @@ case class AlterHoodieTableChangeColumnCommand(
   private def validateSchema(newSchema: Schema, metaClient: 
HoodieTableMetaClient): Unit = {
     val schemaUtil = new TableSchemaResolver(metaClient)
     val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
-    if (!AvroSchemaUtils.isSchemaCompatible(tableSchema, newSchema)) {
+    if (!AvroSchemaUtils.isSchemaCompatible(tableSchema, newSchema, true)) {
       throw new HoodieException("Failed schema compatibility check for 
newSchema :" + newSchema +
         ", origin table schema :" + tableSchema + ", base path :" + 
metaClient.getBasePath)
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
index ad476fb38f3..cd6396f08fb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala
@@ -20,6 +20,7 @@ package org.apache.hudi
 
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.SchemaCompatibilityException
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
@@ -33,7 +34,7 @@ import scala.language.postfixOps
  * Test cases to validate Hudi's support for writing and reading when evolving 
schema implicitly via Avro's Schema Resolution
  * Note: Test will explicitly write into different partitions to ensure that a 
Hudi table will have multiple filegroups with different schemas.
  */
-class TestAvroSchemaResolutionSupport extends HoodieClientTestBase {
+class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with 
ScalaAssertionSupport {
 
   var spark: SparkSession = _
   val commonOpts: Map[String, String] = Map(
@@ -82,12 +83,13 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase {
       .save(saveDir)
   }
 
-  def upsertData(df: DataFrame, saveDir: String, isCow: Boolean = true): Unit 
= {
-    val opts = if (isCow) {
+  def upsertData(df: DataFrame, saveDir: String, isCow: Boolean = true, 
shouldAllowDroppedColumns: Boolean = false): Unit = {
+    var opts = if (isCow) {
       commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
"COPY_ON_WRITE")
     } else {
       commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
"MERGE_ON_READ")
     }
+    opts = opts ++ 
Map(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> 
shouldAllowDroppedColumns.toString)
 
     df.write.format("hudi")
       .options(opts)
@@ -228,7 +230,11 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase {
     upsertDf.show(false)
 
     // upsert
-    upsertData(upsertDf, tempRecordPath, isCow)
+    assertThrows(classOf[SchemaCompatibilityException]) {
+      upsertData(upsertDf, tempRecordPath, isCow)
+    }
+
+    upsertData(upsertDf, tempRecordPath, isCow, shouldAllowDroppedColumns = 
true)
 
     // read out the table
     val readDf = spark.read.format("hudi").load(tempRecordPath)
@@ -776,7 +782,10 @@ class TestAvroSchemaResolutionSupport extends 
HoodieClientTestBase {
     df6 = df6.withColumn("userid", df6.col("userid").cast("float"))
     df6.printSchema()
     df6.show(false)
-    upsertData(df6, tempRecordPath)
+    assertThrows(classOf[SchemaCompatibilityException]) {
+      upsertData(df6, tempRecordPath)
+    }
+    upsertData(df6, tempRecordPath, shouldAllowDroppedColumns = true)
 
     // 7. Rearrange column position
     var df7 = Seq((7, "newcol1", 700, newPartition)).toDF("id", "newcol1", 
"userid", "name")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 657c7762c38..232883b71be 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -30,7 +30,7 @@ import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieIndexConfig, 
HoodieWriteConfig}
-import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.exception.{HoodieException, 
SchemaCompatibilityException}
 import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
 import org.apache.hudi.functional.TestBootstrap
 import org.apache.hudi.keygen.{ComplexKeyGenerator, 
NonpartitionedKeyGenerator, SimpleKeyGenerator}
@@ -46,7 +46,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, 
assertFalse, assertTrue,
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.Arguments.arguments
-import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, 
ValueSource}
+import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, 
MethodSource, ValueSource}
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito.{spy, times, verify}
 import org.scalatest.Assertions.assertThrows
@@ -657,13 +657,21 @@ class TestHoodieSparkSqlWriter {
    * @param tableType Type of table
    */
   @ParameterizedTest
-  @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
-  def testSchemaEvolutionForTableType(tableType: String): Unit = {
+  @CsvSource(value = Array(
+    "COPY_ON_WRITE,true",
+    "COPY_ON_WRITE,false",
+    "MERGE_ON_READ,true",
+    "MERGE_ON_READ,false"
+  ))
+  def testSchemaEvolutionForTableType(tableType: String, allowColumnDrop: 
Boolean): Unit = {
+    val opts = getCommonParams(tempPath, hoodieFooTableName, tableType) ++ Map(
+      HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> 
allowColumnDrop.toString
+    )
+
     // Create new table
     // NOTE: We disable Schema Reconciliation by default (such that Writer's
     //       schema is favored over existing Table's schema)
-    val noReconciliationOpts = getCommonParams(tempPath, hoodieFooTableName, 
tableType)
-      .updated(DataSourceWriteOptions.RECONCILE_SCHEMA.key, "false")
+    val noReconciliationOpts = 
opts.updated(DataSourceWriteOptions.RECONCILE_SCHEMA.key, "false")
 
     // Generate 1st batch
     val schema = DataSourceTestUtils.getStructTypeExampleSchema
@@ -748,22 +756,29 @@ class TestHoodieSparkSqlWriter {
     recordsSeq = convertRowListToSeq(records)
 
     val df5 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
-    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, 
noReconciliationOpts, df5)
 
-    val snapshotDF5 = spark.read.format("org.apache.hudi")
-      .load(tempBasePath + "/*/*/*/*")
+    if (allowColumnDrop) {
+      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, 
noReconciliationOpts, df5)
 
-    assertEquals(35, snapshotDF5.count())
+      val snapshotDF5 = spark.read.format("org.apache.hudi")
+        .load(tempBasePath + "/*/*/*/*")
 
-    assertEquals(df5.intersect(dropMetaFields(snapshotDF5)).except(df5).count, 
0)
+      assertEquals(35, snapshotDF5.count())
 
-    val fifthBatchActualSchema = fetchActualSchema()
-    val fifthBatchExpectedSchema = {
-      val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieFooTableName)
-      AvroConversionUtils.convertStructTypeToAvroSchema(df5.schema, 
structName, nameSpace)
-    }
+      
assertEquals(df5.intersect(dropMetaFields(snapshotDF5)).except(df5).count, 0)
+
+      val fifthBatchActualSchema = fetchActualSchema()
+      val fifthBatchExpectedSchema = {
+        val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieFooTableName)
+        AvroConversionUtils.convertStructTypeToAvroSchema(df5.schema, 
structName, nameSpace)
+      }
 
-    assertEquals(fifthBatchExpectedSchema, fifthBatchActualSchema)
+      assertEquals(fifthBatchExpectedSchema, fifthBatchActualSchema)
+    } else {
+      assertThrows[SchemaCompatibilityException] {
+        HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, 
noReconciliationOpts, df5)
+      }
+    }
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
index bc0585a01e5..ccbd04a45b6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
@@ -107,11 +107,11 @@ class TestBasicSchemaEvolution extends 
HoodieClientTestBase with ScalaAssertionS
         DataSourceWriteOptions.OPERATION.key -> opType
       )
 
-    def appendData(schema: StructType, batch: Seq[Row]): Unit = {
+    def appendData(schema: StructType, batch: Seq[Row], 
shouldAllowDroppedColumns: Boolean = false): Unit = {
       HoodieUnsafeUtils.createDataFrameFromRows(spark, batch, schema)
         .write
         .format("org.apache.hudi")
-        .options(opts)
+        .options(opts ++ 
Map(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> 
shouldAllowDroppedColumns.toString))
         .mode(SaveMode.Append)
         .save(basePath)
     }
@@ -202,7 +202,8 @@ class TestBasicSchemaEvolution extends HoodieClientTestBase 
with ScalaAssertionS
     }
 
     //
-    // 3. Write 3d batch with another schema (w/ omitted a _nullable_ column 
`second_name`, expected to succeed)
+    // 3. Write 3d batch with another schema (w/ omitted a _nullable_ column 
`second_name`, expected to succeed if
+    // col drop is enabled)
     //
 
     val thirdSchema = StructType(
@@ -217,7 +218,14 @@ class TestBasicSchemaEvolution extends 
HoodieClientTestBase with ScalaAssertionS
       Row("8", "Ron", "14", 1, 1),
       Row("9", "Germiona", "16", 1, 1))
 
-    appendData(thirdSchema, thirdBatch)
+    if (shouldReconcileSchema) {
+      appendData(thirdSchema, thirdBatch)
+    } else {
+      assertThrows(classOf[SchemaCompatibilityException]) {
+        appendData(thirdSchema, thirdBatch)
+      }
+      appendData(thirdSchema, thirdBatch, shouldAllowDroppedColumns = true)
+    }
     val (tableSchemaAfterThirdBatch, rowsAfterThirdBatch) = loadTable()
 
     // NOTE: In case schema reconciliation is ENABLED, Hudi would prefer the 
table's schema over the new batch
@@ -270,7 +278,10 @@ class TestBasicSchemaEvolution extends 
HoodieClientTestBase with ScalaAssertionS
         appendData(fourthSchema, fourthBatch)
       }
     } else {
-      appendData(fourthSchema, fourthBatch)
+      assertThrows(classOf[SchemaCompatibilityException]) {
+        appendData(fourthSchema, fourthBatch)
+      }
+      appendData(fourthSchema, fourthBatch, shouldAllowDroppedColumns = true)
       val (latestTableSchema, rows) = loadTable()
 
       assertEquals(fourthSchema, latestTableSchema)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index 15b6751328c..cb807319d94 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -68,10 +68,12 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key"
   ), delimiter = '|')
   def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, 
recordKeys: String): Unit = {
-    var options: Map[String, String] = commonOpts +
-      (HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled)) +
-      (DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass) +
-      (DataSourceWriteOptions.RECORDKEY_FIELD.key() -> recordKeys)
+    var options: Map[String, String] = commonOpts ++ Map(
+      HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled),
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> keyGenClass,
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> recordKeys,
+      HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true")
+
     val isTimestampBasedKeyGen: Boolean = 
classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)
     if (isTimestampBasedKeyGen) {
       options += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key"
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 0d3b09b4c17..9e674db3541 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.{LocatedFileStatus, Path}
 import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
 import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, 
RECORDKEY_FIELD}
 import org.apache.hudi.HoodieConversionUtils.toProperties
-import org.apache.hudi.common.config.{HoodieMetadataConfig, 
HoodieStorageConfig}
+import org.apache.hudi.common.config.{HoodieCommonConfig, 
HoodieMetadataConfig, HoodieStorageConfig}
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.ParquetUtils
@@ -31,7 +31,6 @@ import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.functional.TestColumnStatsIndex.ColumnStatsTestCase
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
-
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.typedLit
 import org.apache.spark.sql.types._
@@ -42,7 +41,6 @@ import org.junit.jupiter.params.provider.{Arguments, 
EnumSource, MethodSource, V
 
 import java.math.BigInteger
 import java.sql.{Date, Timestamp}
-
 import scala.collection.JavaConverters._
 import scala.util.Random
 
@@ -193,7 +191,8 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
       HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
       RECORDKEY_FIELD.key -> "c1",
       PRECOMBINE_FIELD.key -> "c1",
-      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+      HoodieCommonConfig.RECONCILE_SCHEMA.key -> "true"
     ) ++ metadataOpts
 
     val sourceJSONTablePath = 
getClass.getClassLoader.getResource("index/colstats/input-table-json").toString

Reply via email to