This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d41541cb9f8 [MINOR] Handle cases of malformed records when converting 
to json (#10943)
d41541cb9f8 is described below

commit d41541cb9f8509521945271115301345d2153fda
Author: Tim Brown <[email protected]>
AuthorDate: Fri Apr 5 20:07:07 2024 -0500

    [MINOR] Handle cases of malformed records when converting to json (#10943)
---
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |  2 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 14 +++++++++++++
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  | 23 ++++++++++++++++++++++
 .../apache/hudi/utilities/streamer/ErrorEvent.java | 19 ++++++++++++++++++
 .../utilities/streamer/HoodieStreamerUtils.java    |  2 +-
 .../streamer/TestHoodieStreamerUtils.java          | 22 +++++++++++----------
 6 files changed, 70 insertions(+), 12 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 6de5de8842e..3393da6bd83 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -223,7 +223,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
         val transform: GenericRecord => Either[GenericRecord, String] = record 
=> try {
           Left(HoodieAvroUtils.rewriteRecordDeep(record, schema, true))
         } catch {
-          case _: Throwable => Right(HoodieAvroUtils.avroToJsonString(record, 
false))
+          case _: Throwable => 
Right(HoodieAvroUtils.safeAvroToJsonString(record))
         }
         recs.map(transform)
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index a7b3f5ae197..2172c7b1ae0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -197,6 +197,20 @@ public class HoodieAvroUtils {
     return avroToJsonHelper(record, pretty).toString();
   }
 
+  /**
+   * Convert a given avro record to a JSON string. If the record contents are 
invalid, return the record.toString().
+   * Use this method over {@link HoodieAvroUtils#avroToJsonString} when simply 
trying to print the record contents without any guarantees around their 
correctness.
+   * @param record The GenericRecord to convert
+   * @return a JSON string
+   */
+  public static String safeAvroToJsonString(GenericRecord record) {
+    try {
+      return avroToJsonString(record, false);
+    } catch (Exception e) {
+      return record.toString();
+    }
+  }
+
   /**
    * Convert a given avro record to json and return the encoded bytes.
    *
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index eb20081475f..f1e5f606602 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -629,4 +629,27 @@ public class TestHoodieAvroUtils {
     assertEquals("custom_schema_property_value", 
schemaWithMetadata.getProp("custom_schema_property"));
     assertEquals("value", 
originalFieldsInUpdatedSchema.get(0).getProp("custom_field_property"));
   }
+
+  @Test
+  void testSafeAvroToJsonStringMissingRequiredField() {
+    Schema schema = new Schema.Parser().parse(EXAMPLE_SCHEMA);
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("non_pii_col", "val1");
+    record.put("pii_col", "val2");
+    record.put("timestamp", 3.5);
+    String jsonString = HoodieAvroUtils.safeAvroToJsonString(record);
+    assertEquals("{\"timestamp\": 3.5, \"_row_key\": null, \"non_pii_col\": 
\"val1\", \"pii_col\": \"val2\"}", jsonString);
+  }
+
+  @Test
+  void testSafeAvroToJsonStringBadDataType() {
+    Schema schema = new Schema.Parser().parse(EXAMPLE_SCHEMA);
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("non_pii_col", "val1");
+    record.put("_row_key", "key");
+    record.put("pii_col", "val2");
+    record.put("timestamp", "foo");
+    String jsonString = HoodieAvroUtils.safeAvroToJsonString(record);
+    assertEquals("{\"timestamp\": \"foo\", \"_row_key\": \"key\", 
\"non_pii_col\": \"val1\", \"pii_col\": \"val2\"}", jsonString);
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
index f268464d6f1..a2f1cb277ec 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java
@@ -19,6 +19,8 @@
 
 package org.apache.hudi.utilities.streamer;
 
+import java.util.Objects;
+
 /**
  * Error event is an event triggered during write or processing failure of a 
record.
  */
@@ -40,6 +42,23 @@ public class ErrorEvent<T> {
     return reason;
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ErrorEvent<?> that = (ErrorEvent<?>) o;
+    return reason == that.reason && Objects.equals(payload, that.payload);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(reason, payload);
+  }
+
   /**
    * The reason behind write or processing failure of a record
    */
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index 61d7793e6ad..2ecf0b02fb6 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -167,7 +167,7 @@ public class HoodieStreamerUtils {
       }
     }
     try {
-      return Either.right(HoodieAvroUtils.avroToJsonString(genRec, false));
+      return Either.right(HoodieAvroUtils.safeAvroToJsonString(genRec));
     } catch (Exception ex) {
       throw new HoodieException("Failed to convert illegal record to json", 
ex);
     }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
index 19d7bb5da17..e6c388b3e3b 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
@@ -29,17 +29,18 @@ import 
org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import java.util.Collections;
+import java.util.List;
 
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doNothing;
 
 /**
  * Tests {@link HoodieStreamerUtils}.
@@ -73,12 +74,13 @@ public class TestHoodieStreamerUtils extends 
UtilitiesTestBase {
     TypedProperties props = new TypedProperties();
     SchemaProvider schemaProvider = new SimpleSchemaProvider(jsc, schema, 
props);
     BaseErrorTableWriter errorTableWriter = 
Mockito.mock(BaseErrorTableWriter.class);
-    SparkException exception = assertThrows(
-        SparkException.class,
-        () -> HoodieStreamerUtils.createHoodieRecords(cfg, props, 
Option.of(recordRdd),
-                schemaProvider, recordType, false, "000", 
Option.of(errorTableWriter))
-            .get().collect()
-    );
-    assertTrue(exception.getMessage().contains("Failed to convert illegal 
record to json"));
+    ArgumentCaptor<JavaRDD<?>> errorEventCaptor = 
ArgumentCaptor.forClass(JavaRDD.class);
+    
doNothing().when(errorTableWriter).addErrorEvents(errorEventCaptor.capture());
+    HoodieStreamerUtils.createHoodieRecords(cfg, props, Option.of(recordRdd),
+                schemaProvider, recordType, false, "000", 
Option.of(errorTableWriter));
+    List<ErrorEvent<String>> actualErrorEvents = (List<ErrorEvent<String>>) 
errorEventCaptor.getValue().collect();
+    ErrorEvent<String> expectedErrorEvent = new ErrorEvent<>("{\"timestamp\": 
1000, \"_row_key\": \"key1\", \"partition_path\": \"path1\", \"rider\": null, 
\"driver\": \"driver\"}",
+        ErrorEvent.ErrorReason.RECORD_CREATION);
+    assertEquals(Collections.singletonList(expectedErrorEvent), 
actualErrorEvents);
   }
 }

Reply via email to