This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fdcd7dd52e7 [HUDI-9148] Refactor error handling for
UtilHelpers#CreateSource (#13309)
fdcd7dd52e7 is described below
commit fdcd7dd52e741df29fdea6784490c23d5a400a23
Author: Alex R <[email protected]>
AuthorDate: Mon May 19 16:44:35 2025 -0700
[HUDI-9148] Refactor error handling for UtilHelpers#CreateSource (#13309)
---
.../org/apache/hudi/utilities/UtilHelpers.java | 21 +++++++--
.../org/apache/hudi/utilities/TestUtilHelpers.java | 52 ++++++++++++++++++++++
2 files changed, 69 insertions(+), 4 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index d0297daf826..24a1b6ba05b 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -116,6 +116,7 @@ import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
@@ -152,17 +153,29 @@ public class UtilHelpers {
sourceConstructorAndArgs.add(Pair.of(constructorArgsMetrics, new Object[]
{cfg, jssc, sparkSession, streamContext.getSchemaProvider(), metrics}));
sourceConstructorAndArgs.add(Pair.of(constructorArgs, new Object[] {cfg,
jssc, sparkSession, streamContext.getSchemaProvider()}));
- HoodieException sourceClassLoadException = null;
+ List<HoodieException> nonMatchingConstructorExceptions = new ArrayList<>();
for (Pair<Class<?>[], Object[]> constructor : sourceConstructorAndArgs) {
try {
return (Source) ReflectionUtils.loadClass(sourceClass,
constructor.getLeft(), constructor.getRight());
} catch (HoodieException e) {
- sourceClassLoadException = e;
+ if (e.getCause() instanceof NoSuchMethodException) {
+ // If the cause is a NoSuchMethodException, ignore
+ continue;
+ }
+ nonMatchingConstructorExceptions.add(e);
+ String constructorSignature = Arrays.stream(constructor.getLeft())
+ .map(Class::getSimpleName)
+ .collect(Collectors.joining(", ", "[", "]"));
+ LOG.error("Unexpected error while loading source class {} with
constructor signature {}", sourceClass, constructorSignature, e);
} catch (Throwable t) {
- throw new IOException("Could not load source class " + sourceClass, t);
+ throw new IOException("Could not load source class due to unexpected
error " + sourceClass, t);
}
}
- throw new IOException("Could not load source class " + sourceClass,
sourceClassLoadException);
+
+ // Rather than throw the last failure, we will only throw failures that
did not occur due to NoSuchMethodException.
+ IOException ioe = new IOException("Could not load any source class for " +
sourceClass);
+ nonMatchingConstructorExceptions.forEach(ioe::addSuppressed);
+ throw ioe;
}
public static JsonKafkaSourcePostProcessor
createJsonKafkaSourcePostProcessor(String postProcessorClassNames,
TypedProperties props) throws IOException {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
index 941757949fd..f8fb4fd29c2 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
@@ -20,11 +20,27 @@ package org.apache.hudi.utilities;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieLockConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.utilities.sources.AvroKafkaSource;
+import org.apache.hudi.utilities.sources.Source;
+import org.apache.hudi.utilities.sources.helpers.SchemaTestProvider;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.HoodieStreamerMetrics;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
+
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
/**
* Test cases for {@link UtilHelpers}.
@@ -41,4 +57,40 @@ public class TestUtilHelpers {
UtilHelpers.addLockOptions("path2", "file", props2);
assertEquals(1, props2.size(), "Should not add lock options if the lock
provider is already there.");
}
+
+ @Test
+ void testCreateSource() throws IOException {
+ SparkSession sparkSession = mock(SparkSession.class);
+ JavaSparkContext javaSparkContext = mock(JavaSparkContext.class);
+ TypedProperties typedProperties = new TypedProperties();
+ typedProperties.setProperty("hoodie.streamer.source.kafka.topic", "topic");
+ Source source = UtilHelpers.createSource(
+ "org.apache.hudi.utilities.sources.AvroKafkaSource",
+ typedProperties,
+ javaSparkContext,
+ sparkSession,
+ new HoodieStreamerMetrics(
+ HoodieWriteConfig.newBuilder().withPath("mypath").build(),
+ HoodieStorageUtils.getStorage(getDefaultStorageConf())),
+ new DefaultStreamContext(new SchemaTestProvider(typedProperties),
Option.empty()));
+ assertTrue(source instanceof AvroKafkaSource);
+ }
+
+ @Test
+ void testCreateSourceWithErrors() {
+ SparkSession sparkSession = mock(SparkSession.class);
+ JavaSparkContext javaSparkContext = mock(JavaSparkContext.class);
+ TypedProperties typedProperties = new TypedProperties();
+ Throwable e = assertThrows(IOException.class, () ->
UtilHelpers.createSource(
+ "org.apache.hudi.utilities.sources.AvroKafkaSource",
+ typedProperties,
+ javaSparkContext,
+ sparkSession,
+ new HoodieStreamerMetrics(
+ HoodieWriteConfig.newBuilder().withPath("mypath").build(),
+ HoodieStorageUtils.getStorage(getDefaultStorageConf())),
+ new DefaultStreamContext(new SchemaTestProvider(typedProperties),
Option.empty())));
+ // We expect two constructors to complain about this error.
+ assertEquals(2, e.getSuppressed().length);
+ }
}