This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fdcd7dd52e7 [HUDI-9148] Refactor error handling for 
UtilHelpers#CreateSource (#13309)
fdcd7dd52e7 is described below

commit fdcd7dd52e741df29fdea6784490c23d5a400a23
Author: Alex R <[email protected]>
AuthorDate: Mon May 19 16:44:35 2025 -0700

    [HUDI-9148] Refactor error handling for UtilHelpers#CreateSource (#13309)
---
 .../org/apache/hudi/utilities/UtilHelpers.java     | 21 +++++++--
 .../org/apache/hudi/utilities/TestUtilHelpers.java | 52 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 4 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index d0297daf826..24a1b6ba05b 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -116,6 +116,7 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
@@ -152,17 +153,29 @@ public class UtilHelpers {
     sourceConstructorAndArgs.add(Pair.of(constructorArgsMetrics, new Object[] 
{cfg, jssc, sparkSession, streamContext.getSchemaProvider(), metrics}));
     sourceConstructorAndArgs.add(Pair.of(constructorArgs, new Object[] {cfg, 
jssc, sparkSession, streamContext.getSchemaProvider()}));
 
-    HoodieException sourceClassLoadException = null;
+    List<HoodieException> nonMatchingConstructorExceptions = new ArrayList<>();
     for (Pair<Class<?>[], Object[]> constructor : sourceConstructorAndArgs) {
       try {
         return (Source) ReflectionUtils.loadClass(sourceClass, 
constructor.getLeft(), constructor.getRight());
       } catch (HoodieException e) {
-        sourceClassLoadException = e;
+        if (e.getCause() instanceof NoSuchMethodException) {
+          // If the cause is a NoSuchMethodException, ignore
+          continue;
+        }
+        nonMatchingConstructorExceptions.add(e);
+        String constructorSignature = Arrays.stream(constructor.getLeft())
+            .map(Class::getSimpleName)
+            .collect(Collectors.joining(", ", "[", "]"));
+        LOG.error("Unexpected error while loading source class {} with 
constructor signature {}", sourceClass, constructorSignature, e);
       } catch (Throwable t) {
-        throw new IOException("Could not load source class " + sourceClass, t);
+        throw new IOException("Could not load source class due to unexpected 
error " + sourceClass, t);
       }
     }
-    throw new IOException("Could not load source class " + sourceClass, 
sourceClassLoadException);
+
+    // Rather than throw the last failure, we will only throw failures that 
did not occur due to NoSuchMethodException.
+    IOException ioe = new IOException("Could not load any source class for " + 
sourceClass);
+    nonMatchingConstructorExceptions.forEach(ioe::addSuppressed);
+    throw ioe;
   }
 
   public static JsonKafkaSourcePostProcessor 
createJsonKafkaSourcePostProcessor(String postProcessorClassNames, 
TypedProperties props) throws IOException {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
index 941757949fd..f8fb4fd29c2 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
@@ -20,11 +20,27 @@ package org.apache.hudi.utilities;
 
 import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieLockConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.utilities.sources.AvroKafkaSource;
+import org.apache.hudi.utilities.sources.Source;
+import org.apache.hudi.utilities.sources.helpers.SchemaTestProvider;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.HoodieStreamerMetrics;
 
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
 
 /**
  * Test cases for {@link UtilHelpers}.
@@ -41,4 +57,40 @@ public class TestUtilHelpers {
     UtilHelpers.addLockOptions("path2", "file", props2);
     assertEquals(1, props2.size(), "Should not add lock options if the lock 
provider is already there.");
   }
+
+  @Test
+  void testCreateSource() throws IOException {
+    SparkSession sparkSession = mock(SparkSession.class);
+    JavaSparkContext javaSparkContext = mock(JavaSparkContext.class);
+    TypedProperties typedProperties = new TypedProperties();
+    typedProperties.setProperty("hoodie.streamer.source.kafka.topic", "topic");
+    Source source = UtilHelpers.createSource(
+        "org.apache.hudi.utilities.sources.AvroKafkaSource",
+        typedProperties,
+        javaSparkContext,
+        sparkSession,
+        new HoodieStreamerMetrics(
+                HoodieWriteConfig.newBuilder().withPath("mypath").build(),
+                HoodieStorageUtils.getStorage(getDefaultStorageConf())),
+        new DefaultStreamContext(new SchemaTestProvider(typedProperties), 
Option.empty()));
+    assertTrue(source instanceof AvroKafkaSource);
+  }
+
+  @Test
+  void testCreateSourceWithErrors() {
+    SparkSession sparkSession = mock(SparkSession.class);
+    JavaSparkContext javaSparkContext = mock(JavaSparkContext.class);
+    TypedProperties typedProperties = new TypedProperties();
+    Throwable e = assertThrows(IOException.class, () -> 
UtilHelpers.createSource(
+        "org.apache.hudi.utilities.sources.AvroKafkaSource",
+        typedProperties,
+        javaSparkContext,
+        sparkSession,
+        new HoodieStreamerMetrics(
+                HoodieWriteConfig.newBuilder().withPath("mypath").build(),
+                HoodieStorageUtils.getStorage(getDefaultStorageConf())),
+        new DefaultStreamContext(new SchemaTestProvider(typedProperties), 
Option.empty())));
+    // We expect two constructors to complain about this error.
+    assertEquals(2, e.getSuppressed().length);
+  }
 }

Reply via email to