This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d41541cb9f8 [MINOR] Handle cases of malformed records when converting
to json (#10943)
d41541cb9f8 is described below
commit d41541cb9f8509521945271115301345d2153fda
Author: Tim Brown <[email protected]>
AuthorDate: Fri Apr 5 20:07:07 2024 -0500
[MINOR] Handle cases of malformed records when converting to json (#10943)
---
.../scala/org/apache/hudi/HoodieSparkUtils.scala | 2 +-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 14 +++++++++++++
.../org/apache/hudi/avro/TestHoodieAvroUtils.java | 23 ++++++++++++++++++++++
.../apache/hudi/utilities/streamer/ErrorEvent.java | 19 ++++++++++++++++++
.../utilities/streamer/HoodieStreamerUtils.java | 2 +-
.../streamer/TestHoodieStreamerUtils.java | 22 +++++++++++----------
6 files changed, 70 insertions(+), 12 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 6de5de8842e..3393da6bd83 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -223,7 +223,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
val transform: GenericRecord => Either[GenericRecord, String] = record
=> try {
Left(HoodieAvroUtils.rewriteRecordDeep(record, schema, true))
} catch {
- case _: Throwable => Right(HoodieAvroUtils.avroToJsonString(record,
false))
+ case _: Throwable =>
Right(HoodieAvroUtils.safeAvroToJsonString(record))
}
recs.map(transform)
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index a7b3f5ae197..2172c7b1ae0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -197,6 +197,20 @@ public class HoodieAvroUtils {
return avroToJsonHelper(record, pretty).toString();
}
+ /**
+ * Convert a given avro record to a JSON string. If the record contents are
invalid, return the record.toString().
+ * Use this method over {@link HoodieAvroUtils#avroToJsonString} when simply
trying to print the record contents without any guarantees around their
correctness.
+ * @param record The GenericRecord to convert
+ * @return a JSON string
+ */
+ public static String safeAvroToJsonString(GenericRecord record) {
+ try {
+ return avroToJsonString(record, false);
+ } catch (Exception e) {
+ return record.toString();
+ }
+ }
+
/**
* Convert a given avro record to json and return the encoded bytes.
*
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index eb20081475f..f1e5f606602 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -629,4 +629,27 @@ public class TestHoodieAvroUtils {
assertEquals("custom_schema_property_value",
schemaWithMetadata.getProp("custom_schema_property"));
assertEquals("value",
originalFieldsInUpdatedSchema.get(0).getProp("custom_field_property"));
}
+
+ @Test
+ void testSafeAvroToJsonStringMissingRequiredField() {
+ Schema schema = new Schema.Parser().parse(EXAMPLE_SCHEMA);
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("non_pii_col", "val1");
+ record.put("pii_col", "val2");
+ record.put("timestamp", 3.5);
+ String jsonString = HoodieAvroUtils.safeAvroToJsonString(record);
+ assertEquals("{\"timestamp\": 3.5, \"_row_key\": null, \"non_pii_col\":
\"val1\", \"pii_col\": \"val2\"}", jsonString);
+ }
+
+ @Test
+ void testSafeAvroToJsonStringBadDataType() {
+ Schema schema = new Schema.Parser().parse(EXAMPLE_SCHEMA);
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("non_pii_col", "val1");
+ record.put("_row_key", "key");
+ record.put("pii_col", "val2");
+ record.put("timestamp", "foo");
+ String jsonString = HoodieAvroUtils.safeAvroToJsonString(record);
+ assertEquals("{\"timestamp\": \"foo\", \"_row_key\": \"key\",
\"non_pii_col\": \"val1\", \"pii_col\": \"val2\"}", jsonString);
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
index f268464d6f1..a2f1cb277ec 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
@@ -19,6 +19,8 @@
package org.apache.hudi.utilities.streamer;
+import java.util.Objects;
+
/**
* Error event is an event triggered during write or processing failure of a
record.
*/
@@ -40,6 +42,23 @@ public class ErrorEvent<T> {
return reason;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ErrorEvent<?> that = (ErrorEvent<?>) o;
+ return reason == that.reason && Objects.equals(payload, that.payload);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(reason, payload);
+ }
+
/**
* The reason behind write or processing failure of a record
*/
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index 61d7793e6ad..2ecf0b02fb6 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -167,7 +167,7 @@ public class HoodieStreamerUtils {
}
}
try {
- return Either.right(HoodieAvroUtils.avroToJsonString(genRec, false));
+ return Either.right(HoodieAvroUtils.safeAvroToJsonString(genRec));
} catch (Exception ex) {
throw new HoodieException("Failed to convert illegal record to json",
ex);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
index 19d7bb5da17..e6c388b3e3b 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
@@ -29,17 +29,18 @@ import
org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
-import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.util.Collections;
+import java.util.List;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doNothing;
/**
* Tests {@link HoodieStreamerUtils}.
@@ -73,12 +74,13 @@ public class TestHoodieStreamerUtils extends
UtilitiesTestBase {
TypedProperties props = new TypedProperties();
SchemaProvider schemaProvider = new SimpleSchemaProvider(jsc, schema,
props);
BaseErrorTableWriter errorTableWriter =
Mockito.mock(BaseErrorTableWriter.class);
- SparkException exception = assertThrows(
- SparkException.class,
- () -> HoodieStreamerUtils.createHoodieRecords(cfg, props,
Option.of(recordRdd),
- schemaProvider, recordType, false, "000",
Option.of(errorTableWriter))
- .get().collect()
- );
- assertTrue(exception.getMessage().contains("Failed to convert illegal
record to json"));
+ ArgumentCaptor<JavaRDD<?>> errorEventCaptor =
ArgumentCaptor.forClass(JavaRDD.class);
+
doNothing().when(errorTableWriter).addErrorEvents(errorEventCaptor.capture());
+ HoodieStreamerUtils.createHoodieRecords(cfg, props, Option.of(recordRdd),
+ schemaProvider, recordType, false, "000",
Option.of(errorTableWriter));
+ List<ErrorEvent<String>> actualErrorEvents = (List<ErrorEvent<String>>)
errorEventCaptor.getValue().collect();
+ ErrorEvent<String> expectedErrorEvent = new ErrorEvent<>("{\"timestamp\":
1000, \"_row_key\": \"key1\", \"partition_path\": \"path1\", \"rider\": null,
\"driver\": \"driver\"}",
+ ErrorEvent.ErrorReason.RECORD_CREATION);
+ assertEquals(Collections.singletonList(expectedErrorEvent),
actualErrorEvents);
}
}