This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4da18ab5db [HUDI-5191] Fix compatibility with avro 1.10 (#7175)
4da18ab5db is described below

commit 4da18ab5db839372d584dddacfdd562f722a5ba6
Author: Zouxxyy <[email protected]>
AuthorDate: Sat Nov 12 23:25:44 2022 +0800

    [HUDI-5191] Fix compatibility with avro 1.10 (#7175)
    
    [HUDI-5191] Fix avro compatibility with spark3.2+
---
 .github/workflows/bot.yml                          |  8 ++
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 87 ++++++++++++++++------
 .../debezium/AbstractDebeziumAvroPayload.java      |  3 +-
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  | 73 ++++++++++++++----
 .../common/functional/TestHoodieLogFormat.java     |  2 +-
 ...tOverwriteNonDefaultsWithLatestAvroPayload.java |  8 +-
 .../common/model/TestPartialUpdateAvroPayload.java |  2 +-
 .../hudi/common/util/TestObjectSizeCalculator.java |  6 +-
 8 files changed, 146 insertions(+), 43 deletions(-)

diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index 0a61fa2544..fca33bb700 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -73,6 +73,14 @@ jobs:
         run: |
           HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q 
-DforceStdout)
           ./packaging/bundle-validation/ci_run.sh $HUDI_VERSION
+      - name: Common Test
+        env:
+          SCALA_PROFILE: ${{ matrix.scalaProfile }}
+          SPARK_PROFILE: ${{ matrix.sparkProfile }}
+          FLINK_PROFILE: ${{ matrix.flinkProfile }}
+        if: ${{ !endsWith(env.SPARK_PROFILE, '2.4') }} # skip test spark 2.4 
as it's covered by Azure CI
+        run:
+          mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" 
-D"$FLINK_PROFILE" '-Dtest=Test*' -pl hudi-common $MVN_ARGS
       - name: Spark SQL Test
         env:
           SCALA_PROFILE: ${{ matrix.scalaProfile }}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 5288f7fa0c..2f226b2d46 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -54,6 +54,8 @@ import org.apache.avro.io.JsonDecoder;
 import org.apache.avro.io.JsonEncoder;
 import org.apache.avro.specific.SpecificRecordBase;
 
+import org.apache.hadoop.util.VersionUtil;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -89,6 +91,7 @@ import static 
org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema;
  */
 public class HoodieAvroUtils {
 
+  public static final String AVRO_VERSION = 
Schema.class.getPackage().getImplementationVersion();
   private static final ThreadLocal<BinaryEncoder> BINARY_ENCODER = 
ThreadLocal.withInitial(() -> null);
   private static final ThreadLocal<BinaryDecoder> BINARY_DECODER = 
ThreadLocal.withInitial(() -> null);
 
@@ -478,6 +481,32 @@ public class HoodieAvroUtils {
     return fieldName.split("\\.")[0];
   }
 
+  /**
+   * Obtain value of the provided key, which is consistent with avro before 
1.10
+   */
+  public static Object getFieldVal(GenericRecord record, String key) {
+    return getFieldVal(record, key, true);
+  }
+
+  /**
+   * Obtain value of the provided key, when set returnNullIfNotFound false,
+   * it is consistent with avro after 1.10
+   */
+  public static Object getFieldVal(GenericRecord record, String key, boolean 
returnNullIfNotFound) {
+    if (record.getSchema().getField(key) == null) {
+      if (returnNullIfNotFound) {
+        return null;
+      } else {
+        // Since avro 1.10, arvo will throw AvroRuntimeException("Not a valid 
schema field: " + key)
+        // rather than return null like the previous version if record doesn't 
contain this key.
+        // Here we simulate this behavior.
+        throw new AvroRuntimeException("Not a valid schema field: " + key);
+      }
+    } else {
+      return record.get(key);
+    }
+  }
+
   /**
    * Obtain value of the provided field as string, denoted by dot notation. 
e.g: a.b.c
    */
@@ -492,44 +521,50 @@ public class HoodieAvroUtils {
   public static Object getNestedFieldVal(GenericRecord record, String 
fieldName, boolean returnNullIfNotFound, boolean 
consistentLogicalTimestampEnabled) {
     String[] parts = fieldName.split("\\.");
     GenericRecord valueNode = record;
-    int i = 0;
-    try {
-      for (; i < parts.length; i++) {
-        String part = parts[i];
-        Object val = valueNode.get(part);
-        if (val == null) {
-          break;
+
+    for (int i = 0; i < parts.length; i++) {
+      String part = parts[i];
+      Object val;
+      try {
+        val = HoodieAvroUtils.getFieldVal(valueNode, part, 
returnNullIfNotFound);
+      } catch (AvroRuntimeException e) {
+        if (returnNullIfNotFound) {
+          return null;
+        } else {
+          throw new HoodieException(
+              fieldName + "(Part -" + parts[i] + ") field not found in record. 
Acceptable fields were :"
+                  + 
valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
         }
+      }
 
+      if (i == parts.length - 1) {
         // return, if last part of name
-        if (i == parts.length - 1) {
+        if (val == null) {
+          return null;
+        } else {
           Schema fieldSchema = valueNode.getSchema().getField(part).schema();
           return convertValueForSpecificDataTypes(fieldSchema, val, 
consistentLogicalTimestampEnabled);
-        } else {
-          // VC: Need a test here
-          if (!(val instanceof GenericRecord)) {
+        }
+      } else {
+        if (!(val instanceof GenericRecord)) {
+          if (returnNullIfNotFound) {
+            return null;
+          } else {
             throw new HoodieException("Cannot find a record at part value :" + 
part);
           }
+        } else {
           valueNode = (GenericRecord) val;
         }
       }
-    } catch (AvroRuntimeException e) {
-      // Since avro 1.10, arvo will throw AvroRuntimeException("Not a valid 
schema field: " + key)
-      // rather than return null like the previous version if if record 
doesn't contain this key.
-      // So when returnNullIfNotFound is true, catch this exception.
-      if (!returnNullIfNotFound) {
-        throw e;
-      }
     }
 
+    // This can only be reached if the length of parts is 0
     if (returnNullIfNotFound) {
       return null;
-    } else if (valueNode.getSchema().getField(parts[i]) == null) {
+    } else {
       throw new HoodieException(
-          fieldName + "(Part -" + parts[i] + ") field not found in record. 
Acceptable fields were :"
+          fieldName + " field not found in record. Acceptable fields were :"
               + 
valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
-    } else {
-      throw new HoodieException("The value of " + parts[i] + " can not be 
null");
     }
   }
 
@@ -1033,4 +1068,12 @@ public class HoodieAvroUtils {
   public static GenericRecord rewriteRecordDeep(GenericRecord oldRecord, 
Schema newSchema) {
     return rewriteRecordWithNewSchema(oldRecord, newSchema, 
Collections.EMPTY_MAP);
   }
+
+  public static boolean gteqAvro1_9() {
+    return VersionUtil.compareVersions(AVRO_VERSION, "1.9") >= 0;
+  }
+
+  public static boolean gteqAvro1_10() {
+    return VersionUtil.compareVersions(AVRO_VERSION, "1.10") >= 0;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
index 33f1d9f002..9082d572a4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.model.debezium;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.util.Option;
 
@@ -76,7 +77,7 @@ public abstract class AbstractDebeziumAvroPayload extends 
OverwriteWithLatestAvr
     boolean delete = false;
     if (insertRecord instanceof GenericRecord) {
       GenericRecord record = (GenericRecord) insertRecord;
-      Object value = record.get(DebeziumConstants.FLATTENED_OP_COL_NAME);
+      Object value = HoodieAvroUtils.getFieldVal(record, 
DebeziumConstants.FLATTENED_OP_COL_NAME);
       delete = value != null && 
value.toString().equalsIgnoreCase(DebeziumConstants.DELETE_OP);
     }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 3371085d21..483c49b1f5 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -20,12 +20,15 @@ package org.apache.hudi.avro;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.SchemaCompatibilityException;
 
+import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.JsonProperties;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Assertions;
 
@@ -253,7 +256,12 @@ public class TestHoodieAvroUtils {
     assertEquals("key1", rec1.get("_row_key"));
     assertEquals("val1", rec1.get("non_pii_col"));
     assertEquals(3.5, rec1.get("timestamp"));
-    assertNull(rec1.get("pii_col"));
+    if (HoodieAvroUtils.gteqAvro1_10()) {
+      GenericRecord finalRec1 = rec1;
+      assertThrows(AvroRuntimeException.class, () -> finalRec1.get("pii_col"));
+    } else {
+      assertNull(rec1.get("pii_col"));
+    }
     assertEquals(expectedSchema, rec1.getSchema());
 
     // non-partitioned table test with empty list of fields.
@@ -287,19 +295,58 @@ public class TestHoodieAvroUtils {
     assertNull(rowKeyNotExist);
 
     // Field does not exist
-    try {
-      HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false, false);
-    } catch (Exception e) {
-      assertEquals("fake_key(Part -fake_key) field not found in record. 
Acceptable fields were :[timestamp, _row_key, non_pii_col, pii_col]",
-          e.getMessage());
-    }
+    assertEquals("fake_key(Part -fake_key) field not found in record. 
Acceptable fields were :[timestamp, _row_key, non_pii_col, pii_col]",
+        assertThrows(HoodieException.class, () ->
+            HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false, 
false)).getMessage());
 
-    // Field exist while value not
-    try {
-      HoodieAvroUtils.getNestedFieldVal(rec, "timestamp", false, false);
-    } catch (Exception e) {
-      assertEquals("The value of timestamp can not be null", e.getMessage());
-    }
+    // Field exists while value not
+    assertNull(HoodieAvroUtils.getNestedFieldVal(rec, "timestamp", false, 
false));
+  }
+
+  @Test
+  public void testGetNestedFieldValWithNestedField() {
+    Schema nestedSchema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD);
+    GenericRecord rec = new GenericData.Record(nestedSchema);
+
+    // test get .
+    assertEquals(". field not found in record. Acceptable fields were 
:[firstname, lastname, student]",
+        assertThrows(HoodieException.class, () ->
+            HoodieAvroUtils.getNestedFieldVal(rec, ".", false, 
false)).getMessage());
+
+    // test get fake_key
+    assertEquals("fake_key(Part -fake_key) field not found in record. 
Acceptable fields were :[firstname, lastname, student]",
+        assertThrows(HoodieException.class, () ->
+            HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false, 
false)).getMessage());
+
+    // test get student(null)
+    assertNull(HoodieAvroUtils.getNestedFieldVal(rec, "student", false, 
false));
+
+    // test get student
+    GenericRecord studentRecord = new 
GenericData.Record(rec.getSchema().getField("student").schema());
+    studentRecord.put("firstname", "person");
+    rec.put("student", studentRecord);
+    assertEquals(studentRecord, HoodieAvroUtils.getNestedFieldVal(rec, 
"student", false, false));
+
+    // test get student.fake_key
+    assertEquals("student.fake_key(Part -fake_key) field not found in record. 
Acceptable fields were :[firstname, lastname]",
+        assertThrows(HoodieException.class, () ->
+            HoodieAvroUtils.getNestedFieldVal(rec, "student.fake_key", false, 
false)).getMessage());
+
+    // test get student.firstname
+    assertEquals("person", HoodieAvroUtils.getNestedFieldVal(rec, 
"student.firstname", false, false));
+
+    // test get student.lastname(null)
+    assertNull(HoodieAvroUtils.getNestedFieldVal(rec, "student.lastname", 
false, false));
+
+    // test get student.firstname.fake_key
+    assertEquals("Cannot find a record at part value :firstname",
+        assertThrows(HoodieException.class, () ->
+            HoodieAvroUtils.getNestedFieldVal(rec, 
"student.firstname.fake_key", false, false)).getMessage());
+
+    // test get student.lastname(null).fake_key
+    assertEquals("Cannot find a record at part value :lastname",
+        assertThrows(HoodieException.class, () ->
+            HoodieAvroUtils.getNestedFieldVal(rec, 
"student.lastname.fake_key", false, false)).getMessage());
   }
 
   @Test
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 1c58727b39..f16eb52d36 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -2347,7 +2347,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
           new HashMap<HoodieLogBlockType, Integer>() {{
             put(HoodieLogBlockType.AVRO_DATA_BLOCK, 0); // not supported
             put(HoodieLogBlockType.HFILE_DATA_BLOCK, 0); // not supported
-            put(HoodieLogBlockType.PARQUET_DATA_BLOCK, 2605);
+            put(HoodieLogBlockType.PARQUET_DATA_BLOCK, 
HoodieAvroUtils.gteqAvro1_9() ? 2593 : 2605);
           }};
 
       List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
index 0807b41f61..4b7e4bda0b 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
@@ -181,10 +181,10 @@ public class 
TestOverwriteNonDefaultsWithLatestAvroPayload {
   @Test
   public void testNullColumn() throws IOException {
     Schema avroSchema = Schema.createRecord(Arrays.asList(
-            new Schema.Field("id", 
Schema.createUnion(Schema.create(Schema.Type.STRING), 
Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
-            new Schema.Field("name", 
Schema.createUnion(Schema.create(Schema.Type.STRING), 
Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
-            new Schema.Field("age", 
Schema.createUnion(Schema.create(Schema.Type.STRING), 
Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
-            new Schema.Field("job", 
Schema.createUnion(Schema.create(Schema.Type.STRING), 
Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE)
+            new Schema.Field("id", 
Schema.createUnion(Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE),
+            new Schema.Field("name", 
Schema.createUnion(Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE),
+            new Schema.Field("age", 
Schema.createUnion(Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE),
+            new Schema.Field("job", 
Schema.createUnion(Schema.create(Schema.Type.NULL), 
Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE)
             ));
     GenericRecord record1 = new GenericData.Record(avroSchema);
     record1.put("id", "1");
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
index d7e1a6146a..b64b289abf 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
@@ -55,7 +55,7 @@ public class TestPartialUpdateAvroPayload {
       + "    {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n"
       + "    {\"name\": \"partition\", \"type\": [\"null\", \"string\"]},\n"
       + "    {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n"
-      + "    {\"name\": \"_hoodie_is_deleted\", \"type\": [\"null\", 
\"boolean\"], \"default\":false},\n"
+      + "    {\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", 
\"default\": false},\n"
       + "    {\"name\": \"city\", \"type\": [\"null\", \"string\"]},\n"
       + "    {\"name\": \"child\", \"type\": [\"null\", {\"type\": \"array\", 
\"items\": \"string\"}]}\n"
       + "  ]\n"
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java
index 712f4b85f8..625e301984 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.common.util;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieRecord;
 
 import org.apache.avro.Schema;
@@ -73,7 +74,10 @@ public class TestObjectSizeCalculator {
     assertEquals(32, getObjectSize(emptyClass));
     assertEquals(40, getObjectSize(stringClass));
     assertEquals(40, getObjectSize(payloadClass));
-    assertEquals(1240, getObjectSize(Schema.create(Schema.Type.STRING)));
+    // Since avro 1.9, Schema use ConcurrentHashMap instead of LinkedHashMap to
+    // implement props, which will change the size of the object.
+    assertEquals(HoodieAvroUtils.gteqAvro1_9() ? 1320 : 1240,
+        getObjectSize(Schema.create(Schema.Type.STRING)));
     assertEquals(104, getObjectSize(person));
   }
 

Reply via email to