This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4da18ab5db [HUDI-5191] Fix compatibility with avro 1.10 (#7175)
4da18ab5db is described below
commit 4da18ab5db839372d584dddacfdd562f722a5ba6
Author: Zouxxyy <[email protected]>
AuthorDate: Sat Nov 12 23:25:44 2022 +0800
[HUDI-5191] Fix compatibility with avro 1.10 (#7175)
[HUDI-5191] Fix avro compatibility with spark3.2+
---
.github/workflows/bot.yml | 8 ++
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 87 ++++++++++++++++------
.../debezium/AbstractDebeziumAvroPayload.java | 3 +-
.../org/apache/hudi/avro/TestHoodieAvroUtils.java | 73 ++++++++++++++----
.../common/functional/TestHoodieLogFormat.java | 2 +-
...tOverwriteNonDefaultsWithLatestAvroPayload.java | 8 +-
.../common/model/TestPartialUpdateAvroPayload.java | 2 +-
.../hudi/common/util/TestObjectSizeCalculator.java | 6 +-
8 files changed, 146 insertions(+), 43 deletions(-)
diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index 0a61fa2544..fca33bb700 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -73,6 +73,14 @@ jobs:
run: |
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q
-DforceStdout)
./packaging/bundle-validation/ci_run.sh $HUDI_VERSION
+ - name: Common Test
+ env:
+ SCALA_PROFILE: ${{ matrix.scalaProfile }}
+ SPARK_PROFILE: ${{ matrix.sparkProfile }}
+ FLINK_PROFILE: ${{ matrix.flinkProfile }}
+ if: ${{ !endsWith(env.SPARK_PROFILE, '2.4') }} # skip test spark 2.4
as it's covered by Azure CI
+ run:
+ mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-D"$FLINK_PROFILE" '-Dtest=Test*' -pl hudi-common $MVN_ARGS
- name: Spark SQL Test
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 5288f7fa0c..2f226b2d46 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -54,6 +54,8 @@ import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.hadoop.util.VersionUtil;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -89,6 +91,7 @@ import static
org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema;
*/
public class HoodieAvroUtils {
+ public static final String AVRO_VERSION =
Schema.class.getPackage().getImplementationVersion();
private static final ThreadLocal<BinaryEncoder> BINARY_ENCODER =
ThreadLocal.withInitial(() -> null);
private static final ThreadLocal<BinaryDecoder> BINARY_DECODER =
ThreadLocal.withInitial(() -> null);
@@ -478,6 +481,32 @@ public class HoodieAvroUtils {
return fieldName.split("\\.")[0];
}
+ /**
+ * Obtain value of the provided key, which is consistent with avro before
1.10
+ */
+ public static Object getFieldVal(GenericRecord record, String key) {
+ return getFieldVal(record, key, true);
+ }
+
+ /**
+ * Obtain value of the provided key, when set returnNullIfNotFound false,
+ * it is consistent with avro after 1.10
+ */
+ public static Object getFieldVal(GenericRecord record, String key, boolean
returnNullIfNotFound) {
+ if (record.getSchema().getField(key) == null) {
+ if (returnNullIfNotFound) {
+ return null;
+ } else {
+ // Since avro 1.10, arvo will throw AvroRuntimeException("Not a valid
schema field: " + key)
+ // rather than return null like the previous version if record doesn't
contain this key.
+ // Here we simulate this behavior.
+ throw new AvroRuntimeException("Not a valid schema field: " + key);
+ }
+ } else {
+ return record.get(key);
+ }
+ }
+
/**
* Obtain value of the provided field as string, denoted by dot notation.
e.g: a.b.c
*/
@@ -492,44 +521,50 @@ public class HoodieAvroUtils {
public static Object getNestedFieldVal(GenericRecord record, String
fieldName, boolean returnNullIfNotFound, boolean
consistentLogicalTimestampEnabled) {
String[] parts = fieldName.split("\\.");
GenericRecord valueNode = record;
- int i = 0;
- try {
- for (; i < parts.length; i++) {
- String part = parts[i];
- Object val = valueNode.get(part);
- if (val == null) {
- break;
+
+ for (int i = 0; i < parts.length; i++) {
+ String part = parts[i];
+ Object val;
+ try {
+ val = HoodieAvroUtils.getFieldVal(valueNode, part,
returnNullIfNotFound);
+ } catch (AvroRuntimeException e) {
+ if (returnNullIfNotFound) {
+ return null;
+ } else {
+ throw new HoodieException(
+ fieldName + "(Part -" + parts[i] + ") field not found in record.
Acceptable fields were :"
+ +
valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
}
+ }
+ if (i == parts.length - 1) {
// return, if last part of name
- if (i == parts.length - 1) {
+ if (val == null) {
+ return null;
+ } else {
Schema fieldSchema = valueNode.getSchema().getField(part).schema();
return convertValueForSpecificDataTypes(fieldSchema, val,
consistentLogicalTimestampEnabled);
- } else {
- // VC: Need a test here
- if (!(val instanceof GenericRecord)) {
+ }
+ } else {
+ if (!(val instanceof GenericRecord)) {
+ if (returnNullIfNotFound) {
+ return null;
+ } else {
throw new HoodieException("Cannot find a record at part value :" +
part);
}
+ } else {
valueNode = (GenericRecord) val;
}
}
- } catch (AvroRuntimeException e) {
- // Since avro 1.10, arvo will throw AvroRuntimeException("Not a valid
schema field: " + key)
- // rather than return null like the previous version if if record
doesn't contain this key.
- // So when returnNullIfNotFound is true, catch this exception.
- if (!returnNullIfNotFound) {
- throw e;
- }
}
+ // This can only be reached if the length of parts is 0
if (returnNullIfNotFound) {
return null;
- } else if (valueNode.getSchema().getField(parts[i]) == null) {
+ } else {
throw new HoodieException(
- fieldName + "(Part -" + parts[i] + ") field not found in record.
Acceptable fields were :"
+ fieldName + " field not found in record. Acceptable fields were :"
+
valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
- } else {
- throw new HoodieException("The value of " + parts[i] + " can not be
null");
}
}
@@ -1033,4 +1068,12 @@ public class HoodieAvroUtils {
public static GenericRecord rewriteRecordDeep(GenericRecord oldRecord,
Schema newSchema) {
return rewriteRecordWithNewSchema(oldRecord, newSchema,
Collections.EMPTY_MAP);
}
+
+ public static boolean gteqAvro1_9() {
+ return VersionUtil.compareVersions(AVRO_VERSION, "1.9") >= 0;
+ }
+
+ public static boolean gteqAvro1_10() {
+ return VersionUtil.compareVersions(AVRO_VERSION, "1.10") >= 0;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
index 33f1d9f002..9082d572a4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.model.debezium;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
@@ -76,7 +77,7 @@ public abstract class AbstractDebeziumAvroPayload extends
OverwriteWithLatestAvr
boolean delete = false;
if (insertRecord instanceof GenericRecord) {
GenericRecord record = (GenericRecord) insertRecord;
- Object value = record.get(DebeziumConstants.FLATTENED_OP_COL_NAME);
+ Object value = HoodieAvroUtils.getFieldVal(record,
DebeziumConstants.FLATTENED_OP_COL_NAME);
delete = value != null &&
value.toString().equalsIgnoreCase(DebeziumConstants.DELETE_OP);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 3371085d21..483c49b1f5 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -20,12 +20,15 @@ package org.apache.hudi.avro;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.SchemaCompatibilityException;
+import org.apache.avro.AvroRuntimeException;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;
@@ -253,7 +256,12 @@ public class TestHoodieAvroUtils {
assertEquals("key1", rec1.get("_row_key"));
assertEquals("val1", rec1.get("non_pii_col"));
assertEquals(3.5, rec1.get("timestamp"));
- assertNull(rec1.get("pii_col"));
+ if (HoodieAvroUtils.gteqAvro1_10()) {
+ GenericRecord finalRec1 = rec1;
+ assertThrows(AvroRuntimeException.class, () -> finalRec1.get("pii_col"));
+ } else {
+ assertNull(rec1.get("pii_col"));
+ }
assertEquals(expectedSchema, rec1.getSchema());
// non-partitioned table test with empty list of fields.
@@ -287,19 +295,58 @@ public class TestHoodieAvroUtils {
assertNull(rowKeyNotExist);
// Field does not exist
- try {
- HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false, false);
- } catch (Exception e) {
- assertEquals("fake_key(Part -fake_key) field not found in record.
Acceptable fields were :[timestamp, _row_key, non_pii_col, pii_col]",
- e.getMessage());
- }
+ assertEquals("fake_key(Part -fake_key) field not found in record.
Acceptable fields were :[timestamp, _row_key, non_pii_col, pii_col]",
+ assertThrows(HoodieException.class, () ->
+ HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false,
false)).getMessage());
- // Field exist while value not
- try {
- HoodieAvroUtils.getNestedFieldVal(rec, "timestamp", false, false);
- } catch (Exception e) {
- assertEquals("The value of timestamp can not be null", e.getMessage());
- }
+ // Field exists while value not
+ assertNull(HoodieAvroUtils.getNestedFieldVal(rec, "timestamp", false,
false));
+ }
+
+ @Test
+ public void testGetNestedFieldValWithNestedField() {
+ Schema nestedSchema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD);
+ GenericRecord rec = new GenericData.Record(nestedSchema);
+
+ // test get .
+ assertEquals(". field not found in record. Acceptable fields were
:[firstname, lastname, student]",
+ assertThrows(HoodieException.class, () ->
+ HoodieAvroUtils.getNestedFieldVal(rec, ".", false,
false)).getMessage());
+
+ // test get fake_key
+ assertEquals("fake_key(Part -fake_key) field not found in record.
Acceptable fields were :[firstname, lastname, student]",
+ assertThrows(HoodieException.class, () ->
+ HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false,
false)).getMessage());
+
+ // test get student(null)
+ assertNull(HoodieAvroUtils.getNestedFieldVal(rec, "student", false,
false));
+
+ // test get student
+ GenericRecord studentRecord = new
GenericData.Record(rec.getSchema().getField("student").schema());
+ studentRecord.put("firstname", "person");
+ rec.put("student", studentRecord);
+ assertEquals(studentRecord, HoodieAvroUtils.getNestedFieldVal(rec,
"student", false, false));
+
+ // test get student.fake_key
+ assertEquals("student.fake_key(Part -fake_key) field not found in record.
Acceptable fields were :[firstname, lastname]",
+ assertThrows(HoodieException.class, () ->
+ HoodieAvroUtils.getNestedFieldVal(rec, "student.fake_key", false,
false)).getMessage());
+
+ // test get student.firstname
+ assertEquals("person", HoodieAvroUtils.getNestedFieldVal(rec,
"student.firstname", false, false));
+
+ // test get student.lastname(null)
+ assertNull(HoodieAvroUtils.getNestedFieldVal(rec, "student.lastname",
false, false));
+
+ // test get student.firstname.fake_key
+ assertEquals("Cannot find a record at part value :firstname",
+ assertThrows(HoodieException.class, () ->
+ HoodieAvroUtils.getNestedFieldVal(rec,
"student.firstname.fake_key", false, false)).getMessage());
+
+ // test get student.lastname(null).fake_key
+ assertEquals("Cannot find a record at part value :lastname",
+ assertThrows(HoodieException.class, () ->
+ HoodieAvroUtils.getNestedFieldVal(rec,
"student.lastname.fake_key", false, false)).getMessage());
}
@Test
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 1c58727b39..f16eb52d36 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -2347,7 +2347,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
new HashMap<HoodieLogBlockType, Integer>() {{
put(HoodieLogBlockType.AVRO_DATA_BLOCK, 0); // not supported
put(HoodieLogBlockType.HFILE_DATA_BLOCK, 0); // not supported
- put(HoodieLogBlockType.PARQUET_DATA_BLOCK, 2605);
+ put(HoodieLogBlockType.PARQUET_DATA_BLOCK,
HoodieAvroUtils.gteqAvro1_9() ? 2593 : 2605);
}};
List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
index 0807b41f61..4b7e4bda0b 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
@@ -181,10 +181,10 @@ public class
TestOverwriteNonDefaultsWithLatestAvroPayload {
@Test
public void testNullColumn() throws IOException {
Schema avroSchema = Schema.createRecord(Arrays.asList(
- new Schema.Field("id",
Schema.createUnion(Schema.create(Schema.Type.STRING),
Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
- new Schema.Field("name",
Schema.createUnion(Schema.create(Schema.Type.STRING),
Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
- new Schema.Field("age",
Schema.createUnion(Schema.create(Schema.Type.STRING),
Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
- new Schema.Field("job",
Schema.createUnion(Schema.create(Schema.Type.STRING),
Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE)
+ new Schema.Field("id",
Schema.createUnion(Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE),
+ new Schema.Field("name",
Schema.createUnion(Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE),
+ new Schema.Field("age",
Schema.createUnion(Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE),
+ new Schema.Field("job",
Schema.createUnion(Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE)
));
GenericRecord record1 = new GenericData.Record(avroSchema);
record1.put("id", "1");
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
index d7e1a6146a..b64b289abf 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
@@ -55,7 +55,7 @@ public class TestPartialUpdateAvroPayload {
+ " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n"
+ " {\"name\": \"partition\", \"type\": [\"null\", \"string\"]},\n"
+ " {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n"
- + " {\"name\": \"_hoodie_is_deleted\", \"type\": [\"null\",
\"boolean\"], \"default\":false},\n"
+ + " {\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\",
\"default\": false},\n"
+ " {\"name\": \"city\", \"type\": [\"null\", \"string\"]},\n"
+ " {\"name\": \"child\", \"type\": [\"null\", {\"type\": \"array\",
\"items\": \"string\"}]}\n"
+ " ]\n"
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java
index 712f4b85f8..625e301984 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.util;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.avro.Schema;
@@ -73,7 +74,10 @@ public class TestObjectSizeCalculator {
assertEquals(32, getObjectSize(emptyClass));
assertEquals(40, getObjectSize(stringClass));
assertEquals(40, getObjectSize(payloadClass));
- assertEquals(1240, getObjectSize(Schema.create(Schema.Type.STRING)));
+ // Since avro 1.9, Schema use ConcurrentHashMap instead of LinkedHashMap to
+ // implement props, which will change the size of the object.
+ assertEquals(HoodieAvroUtils.gteqAvro1_9() ? 1320 : 1240,
+ getObjectSize(Schema.create(Schema.Type.STRING)));
assertEquals(104, getObjectSize(person));
}