This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 655298911f0 [HUDI-7424] Throw conversion error of Avro record properly 
for error table (#10705)
655298911f0 is described below

commit 655298911f0e6e0fca80c8b3cafd333e55b252ea
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Feb 20 09:44:22 2024 -0800

    [HUDI-7424] Throw conversion error of Avro record properly for error table 
(#10705)
---
 .../utilities/streamer/HoodieStreamerUtils.java    | 24 ++++---
 .../streamer/TestHoodieStreamerUtils.java          | 84 ++++++++++++++++++++++
 2 files changed, 100 insertions(+), 8 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index 44c367ba384..90315bc9764 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -35,7 +35,7 @@ import org.apache.hudi.common.util.Either;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
-import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.keygen.BuiltinKeyGenerator;
 import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -52,7 +52,6 @@ import org.apache.spark.sql.avro.HoodieAvroDeserializer;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -108,7 +107,7 @@ public class HoodieStreamerUtils {
                   if (!shouldErrorTable) {
                     throw e;
                   }
-                  
avroRecords.add(Either.right(HoodieAvroUtils.avroToJsonString(genRec, false)));
+                  avroRecords.add(generateErrorRecord(genRec));
                 }
               }
               return avroRecords.iterator();
@@ -139,11 +138,7 @@ public class HoodieStreamerUtils {
               if (!shouldErrorTable) {
                 throw e;
               }
-              try {
-                return Either.right(HoodieAvroUtils.avroToJsonString(rec, 
false));
-              } catch (IOException ex) {
-                throw new HoodieIOException("Failed to convert illegal record 
to json", ex);
-              }
+              return generateErrorRecord(rec);
             }
           });
 
@@ -159,6 +154,19 @@ public class HoodieStreamerUtils {
     });
   }
 
+  /**
+   * @param genRec Avro {@link GenericRecord} instance.
+   * @return the representation of error record (empty {@link HoodieRecord} 
and the error record
+   * String) for writing to error table.
+   */
+  private static Either<HoodieRecord, String> 
generateErrorRecord(GenericRecord genRec) {
+    try {
+      return Either.right(HoodieAvroUtils.avroToJsonString(genRec, false));
+    } catch (Exception ex) {
+      throw new HoodieException("Failed to convert illegal record to json", 
ex);
+    }
+  }
+
   /**
    * Set based on hoodie.datasource.write.drop.partition.columns config.
    * When set to true, will not write the partition columns into the table.
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
new file mode 100644
index 00000000000..19d7bb5da17
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.schema.SimpleSchemaProvider;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests {@link HoodieStreamerUtils}.
+ */
+public class TestHoodieStreamerUtils extends UtilitiesTestBase {
+  private static final String SCHEMA_STRING = "{\"type\": \"record\"," + 
"\"name\": \"rec\"," + "\"fields\": [ "
+      + "{\"name\": \"timestamp\",\"type\": \"long\"}," + "{\"name\": 
\"_row_key\", \"type\": \"string\"},"
+      + "{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], 
\"default\": null },"
+      + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": 
\"driver\", \"type\": \"string\"}]}";
+
+  @BeforeAll
+  public static void setupOnce() throws Exception {
+    initTestServices();
+  }
+
+  @ParameterizedTest
+  @EnumSource(HoodieRecordType.class)
+  public void testCreateHoodieRecordsWithError(HoodieRecordType recordType) {
+    Schema schema = new Schema.Parser().parse(SCHEMA_STRING);
+    JavaRDD<GenericRecord> recordRdd = 
jsc.parallelize(Collections.singletonList(1)).map(i -> {
+      GenericRecord record = new GenericData.Record(schema);
+      record.put(0, i * 1000L);
+      record.put(1, "key" + i);
+      record.put(2, "path" + i);
+      // The field is non-null in schema but the value is null, so this fails 
the Hudi record creation
+      record.put(3, null);
+      record.put(4, "driver");
+      return record;
+    });
+    HoodieStreamer.Config cfg = new HoodieStreamer.Config();
+    TypedProperties props = new TypedProperties();
+    SchemaProvider schemaProvider = new SimpleSchemaProvider(jsc, schema, 
props);
+    BaseErrorTableWriter errorTableWriter = 
Mockito.mock(BaseErrorTableWriter.class);
+    SparkException exception = assertThrows(
+        SparkException.class,
+        () -> HoodieStreamerUtils.createHoodieRecords(cfg, props, 
Option.of(recordRdd),
+                schemaProvider, recordType, false, "000", 
Option.of(errorTableWriter))
+            .get().collect()
+    );
+    assertTrue(exception.getMessage().contains("Failed to convert illegal 
record to json"));
+  }
+}

Reply via email to