This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c6bf952  [HUDI-1493] Fixed schema compatibility check for fields. 
(#2350)
c6bf952 is described below

commit c6bf95233240d75160670ee96c356946c44c6169
Author: Prashant Wason <[email protected]>
AuthorDate: Tue Dec 29 17:02:21 2020 -0800

    [HUDI-1493] Fixed schema compatibility check for fields. (#2350)
    
    Some field types changes are allowed (e.g. int -> long) while maintaining 
schema backward compatibility within HUDI. The check was reversed with the 
reader schema being passed for the write schema.
---
 .../hudi/client/TestTableSchemaEvolution.java      | 30 +++++++++++++++++++---
 .../hudi/common/table/TableSchemaResolver.java     |  8 +++---
 2 files changed, 32 insertions(+), 6 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index 8ee0c16..708a647 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -107,10 +107,33 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
     assertFalse(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
swappedFieldSchema),
         "Swapped fields are not compatible");
 
-    String typeChangeSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + 
FARE_NESTED_SCHEMA
+    String typeChangeSchemaDisallowed = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + 
FARE_NESTED_SCHEMA
         + TIP_NESTED_SCHEMA.replace("string", "boolean") + TRIP_SCHEMA_SUFFIX;
-    assertFalse(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
typeChangeSchema),
-        "Field type change is not compatible");
+    assertFalse(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
typeChangeSchemaDisallowed),
+        "Incompatible field type change is not allowed");
+
+    // Array of allowed schema field type transitions
+    String[][] allowedFieldChanges = {
+        {"string", "bytes"}, {"bytes", "string"},
+        {"int", "long"}, {"int", "float"}, {"long", "float"},
+        {"int", "double"}, {"float", "double"}, {"long", "double"}};
+    for (String[] fieldChange : allowedFieldChanges) {
+      String fromSchema = TRIP_SCHEMA_PREFIX + 
EXTRA_FIELD_SCHEMA.replace("string", fieldChange[0]) + TRIP_SCHEMA_SUFFIX;
+      String toSchema = TRIP_SCHEMA_PREFIX + 
EXTRA_FIELD_SCHEMA.replace("string", fieldChange[1]) + TRIP_SCHEMA_SUFFIX;
+      assertTrue(TableSchemaResolver.isSchemaCompatible(fromSchema, toSchema),
+          "Compatible field type change is not allowed");
+      if (!fieldChange[0].equals("byte") && fieldChange[1].equals("byte")) {
+        assertFalse(TableSchemaResolver.isSchemaCompatible(toSchema, 
fromSchema),
+            "Incompatible field type change is allowed");
+      }
+    }
+
+    // Names and aliases should match
+    String fromSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA + 
TRIP_SCHEMA_SUFFIX;
+    String toSchema = TRIP_SCHEMA_PREFIX.replace("triprec", "new_triprec") + 
EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX;
+    assertFalse(TableSchemaResolver.isSchemaCompatible(fromSchema, toSchema), 
"Field names should match");
+    assertFalse(TableSchemaResolver.isSchemaCompatible(toSchema, fromSchema), 
"Field names should match");
+
 
     assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED),
         "Added field with default is compatible (Evolved Schema)");
@@ -474,6 +497,7 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
         .build();
   }
 
+  @Override
   protected HoodieTableType getTableType() {
     return tableType;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 372b393..181edd3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -296,7 +296,7 @@ public class TableSchemaResolver {
   public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) 
{
     if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == 
Schema.Type.RECORD) {
       // record names must match:
-      if (!SchemaCompatibility.schemaNameEquals(oldSchema, newSchema)) {
+      if (!SchemaCompatibility.schemaNameEquals(newSchema, oldSchema)) {
         return false;
       }
 
@@ -329,9 +329,11 @@ public class TableSchemaResolver {
       // All fields in the newSchema record can be populated from the 
oldSchema record
       return true;
     } else {
-      // Use the checks implemented by
+      // Use the checks implemented by Avro
+      // newSchema is the schema which will be used to read the records 
written earlier using oldSchema. Hence, in the
+      // check below, use newSchema as the reader schema and oldSchema as the 
writer schema.
       org.apache.avro.SchemaCompatibility.SchemaPairCompatibility compatResult 
=
-          
org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(oldSchema, 
newSchema);
+          
org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(newSchema, 
oldSchema);
       return compatResult.getType() == 
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
     }
   }

Reply via email to