This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 78bf676175968251832f29712b37d09bc4b49c41
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Mar 5 11:37:09 2024 -0800

    [HUDI-7413] Fix schema exception types and error messages thrown with 
schema exceptions (#10677)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../java/org/apache/hudi/table/HoodieTable.java    |   5 +-
 .../apache/hudi/avro/AvroSchemaCompatibility.java  |  48 +++-
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java | 162 ++++++++----
 .../hudi/common/table/TableSchemaResolver.java     |   4 +-
 .../exception/HoodieNullSchemaTypeException.java   |  32 +++
 .../hudi/exception/InvalidUnionTypeException.java  |  33 +++
 ...ption.java => MissingSchemaFieldException.java} |  20 +-
 .../SchemaBackwardsCompatibilityException.java     |  45 ++++
 .../exception/SchemaCompatibilityException.java    |   4 +-
 .../convert/AvroInternalSchemaConverter.java       |  31 ++-
 .../org/apache/hudi/avro/TestAvroSchemaUtils.java  |  25 ++
 .../hudi/common/table/TestTableSchemaResolver.java |   4 +-
 .../schema/utils/TestAvroSchemaEvolutionUtils.java |  35 +++
 .../apache/hudi/sink/ITTestDataStreamWrite.java    |   6 +-
 .../scala/org/apache/hudi/HoodieSchemaUtils.scala  |  42 ++-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  15 --
 .../org/apache/hudi/TestHoodieSchemaUtils.java     | 286 +++++++++++++++++++++
 .../apache/hudi/functional/TestCOWDataSource.scala |  50 +++-
 .../apache/hudi/utilities/streamer/StreamSync.java |   7 +-
 ...estHoodieDeltaStreamerSchemaEvolutionQuick.java |  10 +-
 20 files changed, 745 insertions(+), 119 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index d5244ac427c..ed4e088ebeb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -67,6 +67,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.exception.SchemaCompatibilityException;
 import org.apache.hudi.hadoop.fs.ConsistencyGuard;
 import org.apache.hudi.hadoop.fs.ConsistencyGuard.FileVisibility;
 import org.apache.hudi.index.HoodieIndex;
@@ -854,8 +855,10 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
       Schema writerSchema = 
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
       Schema tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(existingTableSchema.get());
       AvroSchemaUtils.checkSchemaCompatible(tableSchema, writerSchema, 
shouldValidate, allowProjection, getDropPartitionColNames());
+    } catch (SchemaCompatibilityException e) {
+      throw e;
     } catch (Exception e) {
-      throw new HoodieException("Failed to read schema/check compatibility for 
base path " + metaClient.getBasePath(), e);
+      throw new SchemaCompatibilityException("Failed to read schema/check 
compatibility for base path " + metaClient.getBasePath(), e);
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java
index f25824dbd4a..8ed0830815e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java
@@ -36,6 +36,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -283,6 +284,35 @@ public class AvroSchemaCompatibility {
       return result;
     }
 
+    private static String getLocationName(final Deque<LocationInfo> locations, 
Type readerType) {
+      StringBuilder sb = new StringBuilder();
+      Iterator<LocationInfo> locationInfoIterator = locations.iterator();
+      boolean addDot = false;
+      while (locationInfoIterator.hasNext()) {
+        if (addDot) {
+          sb.append(".");
+        } else {
+          addDot = true;
+        }
+        LocationInfo next = locationInfoIterator.next();
+        sb.append(next.name);
+        //we check the reader type if we are at the last location. This is 
because
+        //if the type is array/map, that means the problem is that the field 
type
+        //of the writer is not array/map. If the type is something else, the 
problem
+        //is between the array element/map value of the reader and writer 
schemas
+        if (next.type.equals(Type.MAP)) {
+          if (locationInfoIterator.hasNext() || !readerType.equals(Type.MAP)) {
+            sb.append(".value");
+          }
+        } else if (next.type.equals(Type.ARRAY)) {
+          if (locationInfoIterator.hasNext() || 
!readerType.equals(Type.ARRAY)) {
+            sb.append(".element");
+          }
+        }
+      }
+      return sb.toString();
+    }
+
     /**
      * Calculates the compatibility of a reader/writer schema pair.
      *
@@ -335,7 +365,7 @@ public class AvroSchemaCompatibility {
             for (final Schema writerBranch : writer.getTypes()) {
               SchemaCompatibilityResult compatibility = 
getCompatibility(reader, writerBranch, locations);
               if (compatibility.getCompatibility() == 
SchemaCompatibilityType.INCOMPATIBLE) {
-                String message = String.format("reader union lacking writer 
type: %s", writerBranch.getType());
+                String message = String.format("reader union lacking writer 
type: %s for field: '%s'", writerBranch.getType(), getLocationName(locations, 
reader.getType()));
                 result = 
result.mergedWith(SchemaCompatibilityResult.incompatible(
                     SchemaIncompatibilityType.MISSING_UNION_BRANCH, reader, 
writer, message, asList(locations)));
               }
@@ -407,7 +437,7 @@ public class AvroSchemaCompatibility {
             }
             // No branch in the reader union has been found compatible with 
the writer
             // schema:
-            String message = String.format("reader union lacking writer type: 
%s", writer.getType());
+            String message = String.format("reader union lacking writer type: 
%s for field: '%s'", writer.getType(), getLocationName(locations, 
reader.getType()));
             return result.mergedWith(SchemaCompatibilityResult
                 .incompatible(SchemaIncompatibilityType.MISSING_UNION_BRANCH, 
reader, writer, message, asList(locations)));
           }
@@ -433,9 +463,10 @@ public class AvroSchemaCompatibility {
           // reader field must have a default value.
           if (defaultValueAccessor.getDefaultValue(readerField) == null) {
             // reader field has no default value
+            String message = String.format("Field '%s.%s' has no default 
value", getLocationName(locations, readerField.schema().getType()), 
readerField.name());
             result = result.mergedWith(
                 
SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.READER_FIELD_MISSING_DEFAULT_VALUE,
-                    reader, writer, readerField.name(), asList(locations)));
+                    reader, writer, message, asList(locations)));
           }
         } else {
           locations.addLast(new LocationInfo(readerField.name(), 
readerField.schema().getType()));
@@ -482,8 +513,9 @@ public class AvroSchemaCompatibility {
       final Set<String> symbols = new TreeSet<>(writer.getEnumSymbols());
       symbols.removeAll(reader.getEnumSymbols());
       if (!symbols.isEmpty()) {
+        String message = String.format("Field '%s' missing enum symbols: %s", 
getLocationName(locations, reader.getType()), symbols);
         result = 
SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.MISSING_ENUM_SYMBOLS,
 reader,
-            writer, symbols.toString(), asList(locations));
+            writer, message, asList(locations));
       }
       return result;
     }
@@ -494,7 +526,7 @@ public class AvroSchemaCompatibility {
       int actual = reader.getFixedSize();
       int expected = writer.getFixedSize();
       if (actual != expected) {
-        String message = String.format("expected: %d, found: %d", expected, 
actual);
+        String message = String.format("Fixed size field '%s' expected: %d, 
found: %d", getLocationName(locations, reader.getType()), expected, actual);
         result = 
SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.FIXED_SIZE_MISMATCH,
 reader, writer,
             message, asList(locations));
       }
@@ -511,7 +543,7 @@ public class AvroSchemaCompatibility {
       boolean shouldCheckNames = checkNaming && (locations.size() == 1 || 
locations.peekLast().type == Type.UNION);
       SchemaCompatibilityResult result = 
SchemaCompatibilityResult.compatible();
       if (shouldCheckNames && !Objects.equals(reader.getFullName(), 
writer.getFullName())) {
-        String message = String.format("expected: %s", writer.getFullName());
+        String message = String.format("Reader schema name: '%s' is not 
compatible with writer schema name: '%s'", reader.getFullName(), 
writer.getFullName());
         result = 
SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.NAME_MISMATCH, 
reader, writer,
             message, asList(locations));
       }
@@ -520,8 +552,8 @@ public class AvroSchemaCompatibility {
 
     private SchemaCompatibilityResult typeMismatch(final Schema reader, final 
Schema writer,
                                                    final Deque<LocationInfo> 
locations) {
-      String message = String.format("reader type: %s not compatible with 
writer type: %s", reader.getType(),
-          writer.getType());
+      String message = String.format("reader type '%s' not compatible with 
writer type '%s' for field '%s'", reader.getType(),
+          writer.getType(), getLocationName(locations, reader.getType()));
       return 
SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.TYPE_MISMATCH, 
reader, writer, message,
           asList(locations));
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 5ec466cca3d..6d546263047 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -18,13 +18,19 @@
 
 package org.apache.hudi.avro;
 
+import org.apache.hudi.exception.MissingSchemaFieldException;
+import org.apache.hudi.exception.SchemaBackwardsCompatibilityException;
 import org.apache.hudi.exception.SchemaCompatibilityException;
+import org.apache.hudi.exception.InvalidUnionTypeException;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaCompatibility;
 
+import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -90,20 +96,20 @@ public class AvroSchemaUtils {
    * @return true if prev schema is a projection of new schema.
    */
   public static boolean canProject(Schema prevSchema, Schema newSchema) {
-    return canProject(prevSchema, newSchema, Collections.emptySet());
+    return findMissingFields(prevSchema, newSchema, 
Collections.emptySet()).isEmpty();
   }
 
   /**
-   * Check that each field in the prevSchema can be populated in the newSchema 
except specified columns
+   * Check that each top level field in the prevSchema can be populated in the 
newSchema except specified columns
    * @param prevSchema prev schema.
    * @param newSchema new schema
-   * @return true if prev schema is a projection of new schema.
+   * @return List of fields that should be in the new schema
    */
-  public static boolean canProject(Schema prevSchema, Schema newSchema, 
Set<String> exceptCols) {
+  private static List<Schema.Field> findMissingFields(Schema prevSchema, 
Schema newSchema, Set<String> exceptCols) {
     return prevSchema.getFields().stream()
         .filter(f -> !exceptCols.contains(f.name()))
-        .map(oldSchemaField -> 
SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField))
-        .noneMatch(Objects::isNull);
+        .filter(oldSchemaField -> 
SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField) == null)
+        .collect(Collectors.toList());
   }
 
   /**
@@ -119,31 +125,6 @@ public class AvroSchemaUtils {
     return "hoodie." + sanitizedTableName + "." + sanitizedTableName + 
"_record";
   }
 
-  /**
-   * Validate whether the {@code targetSchema} is a valid evolution of {@code 
sourceSchema}.
-   * Basically {@link #isCompatibleProjectionOf(Schema, Schema)} but type 
promotion in the
-   * opposite direction
-   */
-  public static boolean isValidEvolutionOf(Schema sourceSchema, Schema 
targetSchema) {
-    return (sourceSchema.getType() == Schema.Type.NULL) || 
isProjectionOfInternal(sourceSchema, targetSchema,
-        AvroSchemaUtils::isAtomicSchemasCompatibleEvolution);
-  }
-
-  /**
-   * Establishes whether {@code newReaderSchema} is compatible w/ {@code 
prevWriterSchema}, as
-   * defined by Avro's {@link AvroSchemaCompatibility}.
-   * {@code newReaderSchema} is considered compatible to {@code 
prevWriterSchema}, iff data written using {@code prevWriterSchema}
-   * could be read by {@code newReaderSchema}
-   * @param newReaderSchema new reader schema instance.
-   * @param prevWriterSchema prev writer schema instance.
-   * @return true if its compatible. else false.
-   */
-  private static boolean isAtomicSchemasCompatibleEvolution(Schema 
newReaderSchema, Schema prevWriterSchema) {
-    // NOTE: Checking for compatibility of atomic types, we should ignore their
-    //       corresponding fully-qualified names (as irrelevant)
-    return isSchemaCompatible(prevWriterSchema, newReaderSchema, false, true);
-  }
-
   /**
    * Validate whether the {@code targetSchema} is a "compatible" projection of 
{@code sourceSchema}.
    * Only difference of this method from {@link #isStrictProjectionOf(Schema, 
Schema)} is
@@ -352,25 +333,118 @@ public class AvroSchemaUtils {
       boolean allowProjection,
       Set<String> dropPartitionColNames) throws SchemaCompatibilityException {
 
-    String errorMessage = null;
-
-    if (!allowProjection && !canProject(tableSchema, writerSchema, 
dropPartitionColNames)) {
-      errorMessage = "Column dropping is not allowed";
+    if (!allowProjection) {
+      List<Schema.Field> missingFields = findMissingFields(tableSchema, 
writerSchema, dropPartitionColNames);
+      if (!missingFields.isEmpty()) {
+        throw new 
MissingSchemaFieldException(missingFields.stream().map(Schema.Field::name).collect(Collectors.toList()),
 writerSchema, tableSchema);
+      }
     }
 
     // TODO(HUDI-4772) re-enable validations in case partition columns
     //                 being dropped from the data-file after fixing the write 
schema
-    if (dropPartitionColNames.isEmpty() && shouldValidate && 
!isSchemaCompatible(tableSchema, writerSchema)) {
-      errorMessage = "Failed schema compatibility check";
+    if (dropPartitionColNames.isEmpty() && shouldValidate) {
+      AvroSchemaCompatibility.SchemaPairCompatibility result =
+          AvroSchemaCompatibility.checkReaderWriterCompatibility(writerSchema, 
tableSchema, true);
+      if (result.getType() != 
AvroSchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) {
+        throw new SchemaBackwardsCompatibilityException(result, writerSchema, 
tableSchema);
+      }
     }
+  }
 
-    if (errorMessage != null) {
-      String errorDetails = String.format(
-          "%s\nwriterSchema: %s\ntableSchema: %s",
-          errorMessage,
-          writerSchema,
-          tableSchema);
-      throw new SchemaCompatibilityException(errorDetails);
+  /**
+   * Validate whether the {@code incomingSchema} is a valid evolution of 
{@code tableSchema}.
+   *
+   * @param incomingSchema schema of the incoming dataset
+   * @param tableSchema latest table schema
+   */
+  public static void checkValidEvolution(Schema incomingSchema, Schema 
tableSchema) {
+    if (incomingSchema.getType() == Schema.Type.NULL) {
+      return;
     }
+
+    //not really needed for `hoodie.write.set.null.for.missing.columns` but 
good to check anyway
+    List<String> missingFields = new ArrayList<>();
+    findAnyMissingFields(incomingSchema, tableSchema, new ArrayDeque<>(), 
missingFields);
+    if (!missingFields.isEmpty()) {
+      throw new MissingSchemaFieldException(missingFields, incomingSchema, 
tableSchema);
+    }
+
+    //make sure that the table schema can be read using the incoming schema
+    AvroSchemaCompatibility.SchemaPairCompatibility result =
+        AvroSchemaCompatibility.checkReaderWriterCompatibility(incomingSchema, 
tableSchema, false);
+    if (result.getType() != 
AvroSchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) {
+      throw new SchemaBackwardsCompatibilityException(result, incomingSchema, 
tableSchema);
+    }
+  }
+
+  /**
+   * Find all fields in the latest table schema that are not in
+   * the incoming schema.
+   */
+  private static void findAnyMissingFields(Schema incomingSchema,
+                                           Schema latestTableSchema,
+                                           Deque<String> visited,
+                                           List<String> missingFields) {
+    findAnyMissingFieldsRec(incomingSchema, latestTableSchema, visited,
+        missingFields, incomingSchema, latestTableSchema);
+  }
+
+  /**
+   * We want to pass the full schemas so that the error message has the entire 
schema to print from
+   */
+  private static void findAnyMissingFieldsRec(Schema incomingSchema,
+                                              Schema latestTableSchema,
+                                              Deque<String> visited,
+                                              List<String> missingFields,
+                                              Schema fullIncomingSchema,
+                                              Schema fullTableSchema) {
+    if (incomingSchema.getType() == latestTableSchema.getType()) {
+      if (incomingSchema.getType() == Schema.Type.RECORD) {
+        visited.addLast(latestTableSchema.getName());
+        for (Schema.Field targetField : latestTableSchema.getFields()) {
+          visited.addLast(targetField.name());
+          Schema.Field sourceField = 
incomingSchema.getField(targetField.name());
+          if (sourceField == null) {
+            missingFields.add(String.join(".", visited));
+          } else {
+            findAnyMissingFieldsRec(sourceField.schema(), 
targetField.schema(), visited,
+                missingFields, fullIncomingSchema, fullTableSchema);
+          }
+          visited.removeLast();
+        }
+        visited.removeLast();
+      } else if (incomingSchema.getType() == Schema.Type.ARRAY) {
+        visited.addLast("element");
+        findAnyMissingFieldsRec(incomingSchema.getElementType(), 
latestTableSchema.getElementType(),
+            visited, missingFields, fullIncomingSchema, fullTableSchema);
+        visited.removeLast();
+      } else if (incomingSchema.getType() == Schema.Type.MAP) {
+        visited.addLast("value");
+        findAnyMissingFieldsRec(incomingSchema.getValueType(), 
latestTableSchema.getValueType(),
+            visited, missingFields, fullIncomingSchema, fullTableSchema);
+        visited.removeLast();
+      } else if (incomingSchema.getType() == Schema.Type.UNION) {
+        List<Schema> incomingNestedSchemas = incomingSchema.getTypes();
+        List<Schema> latestTableNestedSchemas = latestTableSchema.getTypes();
+        if (incomingNestedSchemas.size() != latestTableNestedSchemas.size()) {
+          throw new InvalidUnionTypeException(createSchemaErrorString(
+              String.format("Incoming batch field '%s' has union with %d 
types, while the table schema has %d types",
+              String.join(".", visited), incomingNestedSchemas.size(), 
latestTableNestedSchemas.size()), fullIncomingSchema, fullTableSchema));
+        }
+        if (incomingNestedSchemas.size() > 2) {
+          throw new InvalidUnionTypeException(createSchemaErrorString(
+              String.format("Union for incoming batch field '%s' should not 
have more than 2 types but has %d",
+              String.join(".", visited), incomingNestedSchemas.size()), 
fullIncomingSchema, fullTableSchema));
+        }
+        for (int i = 0; i < incomingNestedSchemas.size(); ++i) {
+          findAnyMissingFieldsRec(incomingNestedSchemas.get(i), 
latestTableNestedSchemas.get(i), visited,
+              missingFields, fullIncomingSchema, fullTableSchema);
+        }
+      }
+    }
+  }
+
+  public static String createSchemaErrorString(String errorMessage, Schema 
writerSchema, Schema tableSchema) {
+    return String.format("%s\nwriterSchema: %s\ntableSchema: %s", 
errorMessage, writerSchema, tableSchema);
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 5291c725218..f37dd4e7540 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -37,8 +37,8 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieIncompatibleSchemaException;
 import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import 
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
@@ -567,7 +567,7 @@ public class TableSchemaResolver {
     boolean hasPartitionColNotInSchema = 
Arrays.stream(partitionFields.get()).anyMatch(pf -> 
!containsFieldInSchema(dataSchema, pf));
     boolean hasPartitionColInSchema = 
Arrays.stream(partitionFields.get()).anyMatch(pf -> 
containsFieldInSchema(dataSchema, pf));
     if (hasPartitionColNotInSchema && hasPartitionColInSchema) {
-      throw new HoodieIncompatibleSchemaException("Partition columns could not 
be partially contained w/in the data schema");
+      throw new HoodieSchemaException("Partition columns could not be 
partially contained w/in the data schema");
     }
 
     if (hasPartitionColNotInSchema) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieNullSchemaTypeException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieNullSchemaTypeException.java
new file mode 100644
index 00000000000..ff4abadcde9
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieNullSchemaTypeException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.exception;
+
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+
+/**
+ * Thrown if a schema is null or empty. Or if a field has type null
+ * (null is ok if it is in a union with 1 (one) other type)
+ */
+public class HoodieNullSchemaTypeException extends HoodieSchemaException {
+  public HoodieNullSchemaTypeException(String message) {
+    super(message);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/InvalidUnionTypeException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/InvalidUnionTypeException.java
new file mode 100644
index 00000000000..370ad9438cc
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/InvalidUnionTypeException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.exception;
+
+/**
+ * Thrown when a field is a union and at least one of the following is true:
+ * <ul>
+ *   <li>the incoming union and the latest table union have differing numbers 
of types</li>
+ *   <li>the incoming union has more than two types</li>
+ * </ul>
+ */
+public class InvalidUnionTypeException extends SchemaCompatibilityException {
+  public InvalidUnionTypeException(String message) {
+    super(message);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/MissingSchemaFieldException.java
similarity index 51%
rename from 
hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java
rename to 
hudi-common/src/main/java/org/apache/hudi/exception/MissingSchemaFieldException.java
index a739af67909..4727ff814f1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/MissingSchemaFieldException.java
@@ -18,16 +18,24 @@
 
 package org.apache.hudi.exception;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
+
+import org.apache.avro.Schema;
+
+import java.util.List;
+
 /**
- * Exception for incompatible schema.
+ * Thrown when the schema of the incoming data is missing fields that are in 
the table schema.
  */
-public class HoodieIncompatibleSchemaException extends RuntimeException {
+public class MissingSchemaFieldException extends SchemaCompatibilityException {
 
-  public HoodieIncompatibleSchemaException(String msg, Throwable e) {
-    super(msg, e);
+  public MissingSchemaFieldException(List<String> missingFields, Schema 
writerSchema, Schema tableSchema) {
+    super(constructExceptionMessage(missingFields, writerSchema, tableSchema));
   }
 
-  public HoodieIncompatibleSchemaException(String msg) {
-    super(msg);
+  private static String constructExceptionMessage(List<String> missingFields, 
Schema writerSchema, Schema tableSchema) {
+    return AvroSchemaUtils.createSchemaErrorString(
+        "Schema validation failed due to missing field. Fields missing from 
incoming schema: {"
+        + String.join(", ", missingFields) + "}", writerSchema, tableSchema);
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/SchemaBackwardsCompatibilityException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/SchemaBackwardsCompatibilityException.java
new file mode 100644
index 00000000000..c38d13c9e29
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/SchemaBackwardsCompatibilityException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.exception;
+
+import org.apache.hudi.avro.AvroSchemaCompatibility;
+import org.apache.hudi.avro.AvroSchemaUtils;
+
+import org.apache.avro.Schema;
+
+import java.util.stream.Collectors;
+
+/**
+ * Thrown when there is a backwards compatibility issue with the incoming 
schema.
+ * i.e. when the incoming schema cannot be used to read older data files
+ */
+public class SchemaBackwardsCompatibilityException extends 
SchemaCompatibilityException {
+
+  public 
SchemaBackwardsCompatibilityException(AvroSchemaCompatibility.SchemaPairCompatibility
 compatibility, Schema writerSchema, Schema tableSchema) {
+    super(constructExceptionMessage(compatibility, writerSchema, tableSchema));
+  }
+
+  private static String 
constructExceptionMessage(AvroSchemaCompatibility.SchemaPairCompatibility 
compatibility, Schema writerSchema, Schema tableSchema) {
+    return AvroSchemaUtils.createSchemaErrorString("Schema validation 
backwards compatibility check failed with the following issues: {"
+        + compatibility.getResult().getIncompatibilities().stream()
+            .map(incompatibility -> incompatibility.getType().name() + ": " + 
incompatibility.getMessage())
+            .collect(Collectors.joining(", ")) + "}", writerSchema, 
tableSchema);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/SchemaCompatibilityException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/SchemaCompatibilityException.java
index 478ec0d4269..92d2f6744c1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/exception/SchemaCompatibilityException.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/SchemaCompatibilityException.java
@@ -18,10 +18,12 @@
 
 package org.apache.hudi.exception;
 
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+
 /**
  * An exception thrown when schema has compatibility problems.
  */
-public class SchemaCompatibilityException extends HoodieException {
+public class SchemaCompatibilityException extends HoodieSchemaException {
 
   public SchemaCompatibilityException(String message) {
     super(message);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
index f80eb91522c..54f9cb65ba8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.internal.schema.convert;
 
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieNullSchemaTypeException;
 import org.apache.hudi.internal.schema.HoodieSchemaException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.Type;
@@ -32,6 +33,7 @@ import org.apache.avro.Schema;
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -79,7 +81,7 @@ public class AvroInternalSchemaConverter {
    * but for the metadata table HoodieMetadata.avsc uses a trick where we have 
a bunch of
    * different types wrapped in record for col stats.
    *
-   * @param Schema avro schema.
+   * @param schema avro schema.
    * @return an avro Schema where null is the first.
    */
   public static Schema fixNullOrdering(Schema schema) {
@@ -156,6 +158,29 @@ public class AvroInternalSchemaConverter {
     return visitAvroSchemaToBuildType(schema, visited, true, nextId);
   }
 
+  private static void checkNullType(Type fieldType, String fieldName, 
Deque<String> visited) {
+    if (fieldType == null) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Field '");
+      Iterator<String> visitedIterator = visited.descendingIterator();
+      while (visitedIterator.hasNext()) {
+        sb.append(visitedIterator.next());
+        sb.append(".");
+      }
+      sb.append(fieldName);
+      sb.append("' has type null");
+      throw new HoodieNullSchemaTypeException(sb.toString());
+    } else if (fieldType.typeId() == Type.TypeID.ARRAY) {
+      visited.push(fieldName);
+      checkNullType(((Types.ArrayType) fieldType).elementType(), "element", 
visited);
+      visited.pop();
+    } else if (fieldType.typeId() == Type.TypeID.MAP) {
+      visited.push(fieldName);
+      checkNullType(((Types.MapType) fieldType).valueType(), "value", visited);
+      visited.pop();
+    }
+  }
+
   /**
    * Converts an avro schema into hudi type.
    *
@@ -182,7 +207,9 @@ public class AvroInternalSchemaConverter {
         }
         nextId.set(nextAssignId + fields.size());
         fields.stream().forEach(field -> {
-          fieldTypes.add(visitAvroSchemaToBuildType(field.schema(), visited, 
false, nextId));
+          Type fieldType = visitAvroSchemaToBuildType(field.schema(), visited, 
false, nextId);
+          checkNullType(fieldType, field.name(), visited);
+          fieldTypes.add(fieldType);
         });
         visited.pop();
         List<Types.Field> internalFields = new ArrayList<>(fields.size());
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
index c05683e605c..ea2301ce080 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.avro;
 
+import org.apache.hudi.exception.SchemaBackwardsCompatibilityException;
 import org.apache.hudi.exception.SchemaCompatibilityException;
 
 import org.apache.avro.Schema;
@@ -229,4 +230,28 @@ public class TestAvroSchemaUtils {
   public void testIsCompatiblePartitionDropCols(boolean shouldValidate) {
     AvroSchemaUtils.checkSchemaCompatible(FULL_SCHEMA, SHORT_SCHEMA, 
shouldValidate, false, Collections.singleton("c"));
   }
+
+  private static final Schema BROKEN_SCHEMA = new Schema.Parser().parse("{\n"
+      + "  \"type\" : \"record\",\n"
+      + "  \"name\" : \"broken\",\n"
+      + "  \"fields\" : [ {\n"
+      + "    \"name\" : \"a\",\n"
+      + "    \"type\" : [ \"null\", \"int\" ],\n"
+      + "    \"default\" : null\n"
+      + "  }, {\n"
+      + "    \"name\" : \"b\",\n"
+      + "    \"type\" : [ \"null\", \"int\" ],\n"
+      + "    \"default\" : null\n"
+      + "  }, {\n"
+      + "    \"name\" : \"c\",\n"
+      + "    \"type\" : [ \"null\", \"boolean\" ],\n"
+      + "    \"default\" : null\n"
+      + "  } ]\n"
+      + "}");
+
+  @Test
+  public void  testBrokenSchema() {
+    assertThrows(SchemaBackwardsCompatibilityException.class,
+        () -> AvroSchemaUtils.checkSchemaCompatible(FULL_SCHEMA, 
BROKEN_SCHEMA, true, false, Collections.emptySet()));
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index 3ac42b9d3b7..b7f0ba8eba7 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -21,7 +21,7 @@ package org.apache.hudi.common.table;
 import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieIncompatibleSchemaException;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
 
 import org.apache.avro.Schema;
 import org.junit.jupiter.api.Test;
@@ -61,7 +61,7 @@ public class TestTableSchemaResolver {
     String[] pts4 = {"user_partition", "partition_path"};
     try {
       TableSchemaResolver.appendPartitionColumns(originSchema, 
Option.of(pts3));
-    } catch (HoodieIncompatibleSchemaException e) {
+    } catch (HoodieSchemaException e) {
       assertTrue(e.getMessage().contains("Partial partition fields are still 
in the schema"));
     }
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
index 0be0a5f89c5..4027bd28178 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.internal.schema.utils;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.exception.HoodieNullSchemaTypeException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.InternalSchemaBuilder;
 import org.apache.hudi.internal.schema.Type;
@@ -46,6 +47,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 /**
  * Tests {@link AvroSchemaEvolutionUtils}.
  */
@@ -184,6 +188,37 @@ public class TestAvroSchemaEvolutionUtils {
     Assertions.assertEquals(schema, 
AvroInternalSchemaConverter.convert(internalSchema, "newTableName"));
   }
 
+  @Test
+  public void testNullFieldType() {
+    Schema schema = create("t1",
+        new Schema.Field("nullField", Schema.create(Schema.Type.NULL), null, 
JsonProperties.NULL_VALUE));
+    Throwable t = assertThrows(HoodieNullSchemaTypeException.class,
+        () -> AvroInternalSchemaConverter.convert(schema));
+    assertTrue(t.getMessage().contains("'t1.nullField'"));
+
+    Schema schemaArray = create("t2",
+        new Schema.Field("nullArray", 
Schema.createArray(Schema.create(Schema.Type.NULL)), null, null));
+    t = assertThrows(HoodieNullSchemaTypeException.class,
+        () -> AvroInternalSchemaConverter.convert(schemaArray));
+    assertTrue(t.getMessage().contains("'t2.nullArray.element'"));
+
+    Schema schemaMap = create("t3",
+        new Schema.Field("nullMap", 
Schema.createMap(Schema.create(Schema.Type.NULL)), null, null));
+    t = assertThrows(HoodieNullSchemaTypeException.class,
+        () -> AvroInternalSchemaConverter.convert(schemaMap));
+    assertTrue(t.getMessage().contains("'t3.nullMap.value'"));
+
+
+    Schema schemaComplex = create("t4",
+        new Schema.Field("complexField", Schema.createMap(
+            create("nestedStruct",
+                new Schema.Field("nestedArray", 
Schema.createArray(Schema.createMap(Schema.create(Schema.Type.NULL))),
+                    null, null))), null, null));
+    t = assertThrows(HoodieNullSchemaTypeException.class,
+        () -> AvroInternalSchemaConverter.convert(schemaComplex));
+    
assertTrue(t.getMessage().contains("'t4.nestedStruct.nestedArray.element.value'"));
+  }
+
   @Test
   public void testRefreshNewId() {
     Types.RecordType record = Types.RecordType.get(Types.Field.get(0, false, 
"id", Types.IntType.get()),
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index fea986885f8..47c613ec784 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsInference;
-import org.apache.hudi.exception.SchemaCompatibilityException;
+import org.apache.hudi.exception.MissingSchemaFieldException;
 import org.apache.hudi.sink.transform.ChainedTransformer;
 import org.apache.hudi.sink.transform.Transformer;
 import org.apache.hudi.sink.utils.Pipelines;
@@ -557,13 +557,13 @@ public class ITTestDataStreamWrite extends TestLogger {
     } catch (JobExecutionException e) {
       Throwable actualException = e;
       while (actualException != null) {
-        if (actualException.getClass() == SchemaCompatibilityException.class) {
+        if (actualException.getClass() == MissingSchemaFieldException.class) {
           // test is passed
           return;
         }
         actualException = actualException.getCause();
       }
     }
-    throw new AssertionError(String.format("Excepted exception %s is not 
found", SchemaCompatibilityException.class));
+    throw new AssertionError(String.format("Excepted exception %s is not 
found", MissingSchemaFieldException.class));
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
index 0b42dc75b54..cfc43453e9c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
@@ -21,10 +21,10 @@ package org.apache.hudi
 
 import org.apache.avro.Schema
 import org.apache.hudi.HoodieSparkSqlWriter.{CANONICALIZE_SCHEMA, 
SQL_MERGE_INTO_WRITES}
-import org.apache.hudi.avro.AvroSchemaUtils.{isCompatibleProjectionOf, 
isSchemaCompatible, isValidEvolutionOf}
+import org.apache.hudi.avro.AvroSchemaUtils.{checkSchemaCompatible, 
checkValidEvolution, isCompatibleProjectionOf, isSchemaCompatible}
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.avro.HoodieAvroUtils.removeMetadataFields
-import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.config.{HoodieConfig, TypedProperties}
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.config.HoodieWriteConfig
@@ -78,7 +78,8 @@ object HoodieSchemaUtils {
                          opts: Map[String, String]): Schema = {
     val setNullForMissingColumns = 
opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(),
       
DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean
-    val shouldReconcileSchema = 
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+    val shouldReconcileSchema = 
opts.getOrDefault(DataSourceWriteOptions.RECONCILE_SCHEMA.key(),
+      
DataSourceWriteOptions.RECONCILE_SCHEMA.defaultValue().toString).toBoolean
     val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
       HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
 
@@ -167,34 +168,29 @@ object HoodieSchemaUtils {
                 } else {
                   canonicalizedSourceSchema
                 }
-                if (isValidEvolutionOf(reconciledSchema, latestTableSchema)) {
-                  reconciledSchema
-                } else {
-                  log.error(
-                    s"""Incoming batch schema is not compatible with the 
table's one.
-                       |Incoming schema ${sourceSchema.toString(true)}
-                       |Incoming schema (canonicalized) 
${reconciledSchema.toString(true)}
-                       |Table's schema ${latestTableSchema.toString(true)}
-                       |""".stripMargin)
-                  throw new SchemaCompatibilityException("Incoming batch 
schema is not compatible with the table's one")
-                }
+                checkValidEvolution(reconciledSchema, latestTableSchema)
+                reconciledSchema
               }
-            } else if (isSchemaCompatible(latestTableSchema, 
canonicalizedSourceSchema, allowAutoEvolutionColumnDrop)) {
-              canonicalizedSourceSchema
             } else {
-              log.error(
-                s"""Incoming batch schema is not compatible with the table's 
one.
-                   |Incoming schema ${sourceSchema.toString(true)}
-                   |Incoming schema (canonicalized) 
${canonicalizedSourceSchema.toString(true)}
-                   |Table's schema ${latestTableSchema.toString(true)}
-                   |""".stripMargin)
-              throw new SchemaCompatibilityException("Incoming batch schema is 
not compatible with the table's one")
+              checkSchemaCompatible(latestTableSchema, 
canonicalizedSourceSchema, true,
+                allowAutoEvolutionColumnDrop, java.util.Collections.emptySet())
+              canonicalizedSourceSchema
             }
           }
         }
     }
   }
 
+  def deduceWriterSchema(sourceSchema: Schema,
+                         latestTableSchemaOpt: 
org.apache.hudi.common.util.Option[Schema],
+                         internalSchemaOpt: 
org.apache.hudi.common.util.Option[InternalSchema],
+                         props: TypedProperties): Schema = {
+    deduceWriterSchema(sourceSchema,
+      HoodieConversionUtils.toScalaOption(latestTableSchemaOpt),
+      HoodieConversionUtils.toScalaOption(internalSchemaOpt),
+      HoodieConversionUtils.fromProperties(props))
+  }
+
   /**
    * Canonicalizes [[sourceSchema]] by reconciling it w/ [[latestTableSchema]] 
in following
    *
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index eea93e426fb..dbeb9714333 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -133,21 +133,6 @@ object HoodieSparkSqlWriter {
     new HoodieSparkSqlWriterInternal().bootstrap(sqlContext, mode, optParams, 
df, hoodieTableConfigOpt, streamingWritesParamsOpt, hoodieWriteClient)
   }
 
-  /**
-   * Deduces writer's schema based on
-   * <ul>
-   * <li>Source's schema</li>
-   * <li>Target table's schema (including Hudi's [[InternalSchema]] 
representation)</li>
-   * </ul>
-   */
-  def deduceWriterSchema(sourceSchema: Schema,
-                         latestTableSchemaOpt: Option[Schema],
-                         internalSchemaOpt: Option[InternalSchema],
-                         props: TypedProperties): Schema = {
-    HoodieSchemaUtils.deduceWriterSchema(sourceSchema, latestTableSchemaOpt,
-      internalSchemaOpt, HoodieConversionUtils.fromProperties(props))
-  }
-
   def cleanup(): Unit = {
     Metrics.shutdownAllMetrics()
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieSchemaUtils.java
 
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieSchemaUtils.java
new file mode 100644
index 00000000000..b10d0cfa992
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieSchemaUtils.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieNullSchemaTypeException;
+import org.apache.hudi.exception.MissingSchemaFieldException;
+import org.apache.hudi.exception.SchemaBackwardsCompatibilityException;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieSchemaUtils {
+
+  @Test
+  void testSchemaWithNullField() {
+    Schema withNullfield = createRecord("nullRecord", 
createPrimitiveField("nullField", Schema.Type.NULL));
+    assertThrows(HoodieNullSchemaTypeException.class,
+        () -> deduceWriterSchema(withNullfield, null));
+  }
+
+  @Test
+  void testSimplePromotionWithComplexFields() {
+    Schema start = createRecord("simple", createPrimitiveField("f", 
Schema.Type.INT));
+    Schema end = createRecord("simple", createPrimitiveField("f", 
Schema.Type.LONG));
+    assertEquals(end, deduceWriterSchema(end, start));
+
+    start = createRecord("nested", createNestedField("f", Schema.Type.INT));
+    end = createRecord("nested", createNestedField("f", Schema.Type.LONG));
+    assertEquals(end, deduceWriterSchema(end, start));
+
+    start = createRecord("arrayRec", createArrayField("f", Schema.Type.INT));
+    end = createRecord("arrayRec", createArrayField("f", Schema.Type.LONG));
+    assertEquals(end, deduceWriterSchema(end, start));
+
+    start = createRecord("mapRec", createMapField("f", Schema.Type.INT));
+    end = createRecord("mapRec", createMapField("f", Schema.Type.LONG));
+    assertEquals(end, deduceWriterSchema(end, start));
+  }
+
+  @Test
+  void testAllowedTypePromotions() {
+    Schema.Type[] promotionTypes = new Schema.Type[]{Schema.Type.INT, 
Schema.Type.LONG, Schema.Type.FLOAT, Schema.Type.DOUBLE, Schema.Type.STRING, 
Schema.Type.BYTES};
+    Map<Schema.Type, Pair<Integer,Integer>> allowedPromotions = new 
HashMap<>();
+    //allowedPromotions.key can be promoted to any type in the range 
allowedPromotions.value
+    allowedPromotions.put(Schema.Type.INT, Pair.of(0, 4));
+    allowedPromotions.put(Schema.Type.LONG, Pair.of(1, 4));
+    allowedPromotions.put(Schema.Type.FLOAT, Pair.of(2, 4));
+    allowedPromotions.put(Schema.Type.DOUBLE, Pair.of(3, 4));
+    allowedPromotions.put(Schema.Type.STRING, Pair.of(4, 4));
+    allowedPromotions.put(Schema.Type.BYTES, Pair.of(5, 5));
+
+    Map<Schema.Type, Schema> schemaMap = new HashMap<>();
+    for (Schema.Type type : promotionTypes) {
+      schemaMap.put(type, createRecord("rec",
+          createPrimitiveField("simpleField", type),
+          createArrayField("arrayField", type),
+          createMapField("mapField", type),
+          createNestedField("nestedField", type)));
+    }
+
+    for (int i = 0; i < promotionTypes.length; i++) {
+      Schema startSchema = schemaMap.get(promotionTypes[i]);
+      Pair<Integer,Integer> minMax = allowedPromotions.get(promotionTypes[i]);
+      for (int j = minMax.getLeft(); j <= minMax.getRight(); j++) {
+        Schema endSchema = schemaMap.get(promotionTypes[j]);
+        assertEquals(endSchema, deduceWriterSchema(endSchema, startSchema));
+      }
+    }
+  }
+
+  @Test
+  void testReversePromotions() {
+    Schema.Type[] promotionTypes = new Schema.Type[]{Schema.Type.INT, 
Schema.Type.LONG, Schema.Type.FLOAT, Schema.Type.DOUBLE, Schema.Type.STRING, 
Schema.Type.BYTES};
+    Map<Schema.Type, Pair<Integer,Integer>> reversePromotions = new 
HashMap<>();
+    //Incoming data types in the range reversePromotions.value will be 
promoted to reversePromotions.key
+    //if reversePromotions.key is the current table schema
+    reversePromotions.put(Schema.Type.INT, Pair.of(0, 0));
+    reversePromotions.put(Schema.Type.LONG, Pair.of(0, 1));
+    reversePromotions.put(Schema.Type.FLOAT, Pair.of(0, 2));
+    reversePromotions.put(Schema.Type.DOUBLE, Pair.of(0, 3));
+    reversePromotions.put(Schema.Type.STRING, Pair.of(0, 5));
+    reversePromotions.put(Schema.Type.BYTES, Pair.of(4, 5));
+
+    Map<Schema.Type, Schema> schemaMap = new HashMap<>();
+    for (Schema.Type type : promotionTypes) {
+      schemaMap.put(type, createRecord("rec",
+          createPrimitiveField("simpleField", type),
+          createArrayField("arrayField", type),
+          createMapField("mapField", type),
+          createNestedField("nestedField", type)));
+    }
+
+    for (int i = 0; i < promotionTypes.length; i++) {
+      Schema startSchema = schemaMap.get(promotionTypes[i]);
+      Pair<Integer,Integer> minMax = reversePromotions.get(promotionTypes[i]);
+      for (int j = minMax.getLeft(); j <= minMax.getRight(); j++) {
+        Schema endSchema = schemaMap.get(promotionTypes[j]);
+        assertEquals(startSchema, deduceWriterSchema(endSchema, startSchema));
+      }
+    }
+  }
+
+  @Test
+  void testIllegalPromotionsBetweenPrimitives() {
+    Schema.Type[] promotionTypes = new Schema.Type[]{Schema.Type.INT, 
Schema.Type.LONG, Schema.Type.FLOAT, Schema.Type.DOUBLE, Schema.Type.BYTES};
+    Map<Schema.Type, Schema> schemaMap = new HashMap<>();
+    for (Schema.Type type : promotionTypes) {
+      schemaMap.put(type, createRecord("rec",
+          createPrimitiveField("simpleField", type),
+          createArrayField("arrayField", type),
+          createMapField("mapField", type),
+          createNestedField("nestedField", type)));
+    }
+
+    String[] fieldNames = new String[]{"rec.simpleField", 
"rec.arrayField.element", "rec.mapField.value", "rec.nestedField.nested"};
+    //int, long, float, double can't be promoted to bytes
+    for (int i = 0; i < 4; i++) {
+      Schema startSchema = schemaMap.get(promotionTypes[i]);
+      Schema endSchema = schemaMap.get(Schema.Type.BYTES);
+      Throwable t = assertThrows(SchemaBackwardsCompatibilityException.class,
+          () -> deduceWriterSchema(endSchema, startSchema));
+      String baseString = String.format("TYPE_MISMATCH: reader type 'BYTES' 
not compatible with writer type '%s' for field '%%s'",
+          promotionTypes[i].getName().toUpperCase());
+      for (String fieldName : fieldNames) {
+        assertTrue(t.getMessage().contains(String.format(baseString, 
fieldName)));
+      }
+    }
+  }
+
+  @Test
+  void testIllegalPromotionsBetweenComplexFields() {
+    String[] typeNames = new String[]{"INT", "ARRAY", "MAP", "RECORD"};
+    Schema[] fieldTypes = new Schema[]{createRecord("rec", 
createPrimitiveField("testField", Schema.Type.INT)),
+        createRecord("rec", createArrayField("testField", Schema.Type.INT)),
+        createRecord("rec", createMapField("testField", Schema.Type.INT)),
+        createRecord("rec", createNestedField("testField", Schema.Type.INT))};
+
+    for (int i = 0; i < fieldTypes.length; i++) {
+      for (int j = 0; j < fieldTypes.length; j++) {
+        if (i != j) {
+          Schema startSchema = fieldTypes[i];
+          Schema endSchema = fieldTypes[j];
+          Throwable t = 
assertThrows(SchemaBackwardsCompatibilityException.class,
+              () -> deduceWriterSchema(startSchema, endSchema));
+          String errorMessage = String.format("Schema validation backwards 
compatibility check failed with the following issues: "
+              + "{TYPE_MISMATCH: reader type '%s' not compatible with writer 
type '%s' for field 'rec.testField'}", typeNames[i], typeNames[j]);
+          assertTrue(t.getMessage().startsWith(errorMessage));
+        }
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans =  {true, false})
+  void testMissingColumn(boolean allowDroppedColumns) {
+    //simple case
+    Schema start = createRecord("missingSimpleField",
+        createPrimitiveField("field1", Schema.Type.INT),
+        createPrimitiveField("field2", Schema.Type.INT),
+        createPrimitiveField("field3", Schema.Type.INT));
+    Schema end = createRecord("missingSimpleField",
+        createPrimitiveField("field1", Schema.Type.INT),
+        createPrimitiveField("field3", Schema.Type.INT));
+    try {
+      assertEquals(start, deduceWriterSchema(end, start, allowDroppedColumns));
+      assertTrue(allowDroppedColumns);
+    } catch (MissingSchemaFieldException e) {
+      assertFalse(allowDroppedColumns);
+      assertTrue(e.getMessage().contains("missingSimpleField.field2"));
+    }
+
+    //complex case
+    start = createRecord("missingComplexField",
+        createPrimitiveField("field1", Schema.Type.INT),
+        createPrimitiveField("field2", Schema.Type.INT),
+        createArrayField("field3", createRecord("nestedRecord",
+                createPrimitiveField("nestedField1", Schema.Type.INT),
+                createPrimitiveField("nestedField2", Schema.Type.INT),
+                createPrimitiveField("nestedField3", Schema.Type.INT))),
+        createPrimitiveField("field4", Schema.Type.INT));
+    end = createRecord("missingComplexField",
+        createPrimitiveField("field1", Schema.Type.INT),
+        createPrimitiveField("field2", Schema.Type.INT),
+        createPrimitiveField("field4", Schema.Type.INT));
+    try {
+      assertEquals(start, deduceWriterSchema(end, start, allowDroppedColumns));
+      assertTrue(allowDroppedColumns);
+    } catch (MissingSchemaFieldException e) {
+      assertFalse(allowDroppedColumns);
+      assertTrue(e.getMessage().contains("missingComplexField.field3"));
+    }
+
+    //partial missing field
+    end = createRecord("missingComplexField",
+        createPrimitiveField("field1", Schema.Type.INT),
+        createArrayField("field3", createRecord("nestedRecord",
+            createPrimitiveField("nestedField2", Schema.Type.INT),
+            createPrimitiveField("nestedField3", Schema.Type.INT))),
+        createPrimitiveField("field4", Schema.Type.INT));
+    try {
+      assertEquals(start, deduceWriterSchema(end, start, allowDroppedColumns));
+      assertTrue(allowDroppedColumns);
+    } catch (MissingSchemaFieldException e) {
+      assertFalse(allowDroppedColumns);
+      
assertTrue(e.getMessage().contains("missingComplexField.field3.element.nestedRecord.nestedField1"));
+      assertTrue(e.getMessage().contains("missingComplexField.field2"));
+    }
+  }
+
+  private static Schema deduceWriterSchema(Schema incomingSchema, Schema 
latestTableSchema) {
+    return deduceWriterSchema(incomingSchema, latestTableSchema, false);
+  }
+
+  private static final TypedProperties TYPED_PROPERTIES = new 
TypedProperties();
+
+  private static Schema deduceWriterSchema(Schema incomingSchema, Schema 
latestTableSchema, Boolean addNull) {
+    
TYPED_PROPERTIES.setProperty(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key(),
 addNull.toString());
+    return HoodieSchemaUtils.deduceWriterSchema(incomingSchema, 
Option.ofNullable(latestTableSchema),
+        Option.empty(), TYPED_PROPERTIES);
+  }
+
+  private static Schema.Field createNestedField(String name, Schema.Type type) 
{
+    return createNestedField(name, Schema.create(type));
+  }
+
+  private static Schema.Field createNestedField(String name, Schema schema) {
+    return new Schema.Field(name, createRecord(name, new 
Schema.Field("nested", schema, null, null)), null, null);
+  }
+
+  private static Schema.Field createArrayField(String name, Schema.Type type) {
+    return createArrayField(name, Schema.create(type));
+  }
+
+  private static Schema.Field createArrayField(String name, Schema schema) {
+    return new Schema.Field(name, Schema.createArray(schema), null, null);
+  }
+
+  private static Schema.Field createMapField(String name, Schema.Type type) {
+    return createMapField(name, Schema.create(type));
+  }
+
+  private static Schema.Field createMapField(String name, Schema schema) {
+    return new Schema.Field(name, Schema.createMap(schema), null, null);
+  }
+
+  private static Schema.Field createPrimitiveField(String name, Schema.Type 
type) {
+    return new Schema.Field(name, Schema.create(type), null, null);
+  }
+  
+  private static Schema createRecord(String name, Schema.Field... fields) {
+    return Schema.createRecord(name, null, null, false, Arrays.asList(fields));
+  }
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index ff87a90cef8..22a61d58881 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -22,8 +22,9 @@ import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 import org.apache.hudi.DataSourceWriteOptions.{INLINE_CLUSTERING_ENABLE, 
KEYGENERATOR_CLASS_NAME}
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.QuickstartUtils.{convertToStringList, 
getQuickstartWriteConfigs}
+import org.apache.hudi.avro.AvroSchemaCompatibility.SchemaIncompatibilityType
 import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
 import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
 TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
@@ -36,9 +37,10 @@ import org.apache.hudi.common.util
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.metrics.HoodieMetricsConfig
 import org.apache.hudi.exception.ExceptionUtil.getRootCause
-import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.exception.{HoodieException, 
SchemaBackwardsCompatibilityException}
 import org.apache.hudi.functional.CommonOptionUtils._
 import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
+import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.keygen._
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.metrics.{Metrics, MetricsReporterType}
@@ -1759,6 +1761,50 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
     assertEquals(0, result.filter(result("id") === 1).count())
   }
 
+  /** Test case to verify MAKE_NEW_COLUMNS_NULLABLE config parameter. */
+  @Test
+  def testSchemaEvolutionWithNewColumn(): Unit = {
+    val df1 = spark.sql("select '1' as event_id, '2' as ts, '3' as version, 
'foo' as event_date")
+    var hudiOptions = Map[String, String](
+      HoodieWriteConfig.TBL_NAME.key() -> "test_hudi_merger",
+      KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() -> "event_id",
+      KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() -> "version",
+      DataSourceWriteOptions.OPERATION.key() -> "insert",
+      HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key() -> "ts",
+      HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key() -> 
"org.apache.hudi.keygen.ComplexKeyGenerator",
+      KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key() -> "true",
+      HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key() -> "false",
+      HoodieWriteConfig.RECORD_MERGER_IMPLS.key() -> 
"org.apache.hudi.HoodieSparkRecordMerger"
+    )
+    
df1.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(basePath)
+
+    // Try adding a string column. This operation is expected to throw 'schema 
not compatible' exception since
+    // 'MAKE_NEW_COLUMNS_NULLABLE' parameter is 'false' by default.
+    val df2 = spark.sql("select '2' as event_id, '2' as ts, '3' as version, 
'foo' as event_date, 'bar' as add_col")
+    try {
+      
(df2.write.format("hudi").options(hudiOptions).mode("append").save(basePath))
+      fail("Option succeeded, but was expected to fail.")
+    } catch {
+      case ex: SchemaBackwardsCompatibilityException => {
+        
assertTrue(ex.getMessage.contains(SchemaIncompatibilityType.READER_FIELD_MISSING_DEFAULT_VALUE.name()))
+      }
+      case ex: Exception => {
+        fail(ex)
+      }
+    }
+
+    // Try adding the string column again. This operation is expected to 
succeed since 'MAKE_NEW_COLUMNS_NULLABLE'
+    // parameter has been set to 'true'.
+    hudiOptions = hudiOptions + 
(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key() -> "true")
+    try {
+      
(df2.write.format("hudi").options(hudiOptions).mode("append").save(basePath))
+    } catch {
+      case ex: Exception => {
+        fail(ex)
+      }
+    }
+  }
+
   def assertLastCommitIsUpsert(): Boolean = {
     val metaClient = HoodieTableMetaClient.builder()
       .setBasePath(basePath)
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index fe8eb909db4..0c68831fcd8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -667,10 +667,9 @@ public class StreamSync implements Serializable, Closeable 
{
             new HoodieConfig(HoodieStreamer.Config.getProps(fs, cfg)), 
metaClient));
     // Deduce proper target (writer's) schema for the input dataset, 
reconciling its
     // schema w/ the table's one
-    Schema targetSchema = HoodieSparkSqlWriter.deduceWriterSchema(
-          HoodieAvroUtils.removeMetadataFields(incomingSchema),
-          HoodieConversionUtils.toScalaOption(latestTableSchemaOpt),
-          HoodieConversionUtils.toScalaOption(internalSchemaOpt), props);
+    Schema targetSchema = HoodieSchemaUtils.deduceWriterSchema(
+        HoodieAvroUtils.removeMetadataFields(incomingSchema),
+          latestTableSchemaOpt, internalSchemaOpt, props);
 
     // Override schema provider with the reconciled target schema
     return new DelegatingSchemaProvider(props, hoodieSparkContext.jsc(), 
sourceSchemaProvider,
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index eee30c84411..4a5ad75ea84 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.SchemaCompatibilityException;
+import org.apache.hudi.exception.MissingSchemaFieldException;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.streamer.HoodieStreamer;
 
@@ -125,6 +125,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
       b.add(Arguments.of("COPY_ON_WRITE", true, true, true, true, true));
       b.add(Arguments.of("COPY_ON_WRITE", true, false, false, false, true));
       b.add(Arguments.of("MERGE_ON_READ", true, true, true, false, false));
+      b.add(Arguments.of("MERGE_ON_READ", true, true, false, false, false));
       b.add(Arguments.of("MERGE_ON_READ", true, false, true, true, false));
     }
     return b.build();
@@ -220,8 +221,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
       addData(df, false);
       deltaStreamer.sync();
       assertTrue(allowNullForDeletedCols);
-    } catch (SchemaCompatibilityException e) {
-      assertTrue(e.getMessage().contains("Incoming batch schema is not 
compatible with the table's one"));
+    } catch (MissingSchemaFieldException e) {
       assertFalse(allowNullForDeletedCols);
       return;
     }
@@ -404,10 +404,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
       
assertTrue(latestTableSchemaOpt.get().getField("rider").schema().getTypes()
           .stream().anyMatch(t -> t.getType().equals(Schema.Type.STRING)));
       
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
 > 0);
-    } catch (SchemaCompatibilityException e) {
+    } catch (MissingSchemaFieldException e) {
       assertFalse(allowNullForDeletedCols || targetSchemaSameAsTableSchema);
-      assertTrue(e.getMessage().contains("Incoming batch schema is not 
compatible with the table's one"));
-      assertFalse(allowNullForDeletedCols);
     }
   }
 

Reply via email to