yihua commented on code in PR #10918:
URL: https://github.com/apache/hudi/pull/10918#discussion_r1599449303
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -72,21 +73,23 @@ public S3EventsHoodieIncrSource(
TypedProperties props,
JavaSparkContext sparkContext,
SparkSession sparkSession,
+ HoodieIngestionMetrics metrics,
Review Comment:
Based on the reflection util `UtilHelpers#createSource`, this argument
should be after the `SchemaProvider` instance.
```
public static Source createSource(String sourceClass, TypedProperties cfg,
JavaSparkContext jssc,
SparkSession sparkSession,
HoodieIngestionMetrics metrics, StreamContext streamContext) throws IOException
{
// All possible constructors.
Class<?>[] constructorArgsStreamContextMetrics = new Class<?>[]
{TypedProperties.class, JavaSparkContext.class, SparkSession.class,
HoodieIngestionMetrics.class, StreamContext.class};
Class<?>[] constructorArgsStreamContext = new Class<?>[]
{TypedProperties.class, JavaSparkContext.class, SparkSession.class,
StreamContext.class};
Class<?>[] constructorArgsMetrics = new Class<?>[]
{TypedProperties.class, JavaSparkContext.class, SparkSession.class,
SchemaProvider.class, HoodieIngestionMetrics.class};
Class<?>[] constructorArgs = new Class<?>[] {TypedProperties.class,
JavaSparkContext.class, SparkSession.class, SchemaProvider.class};
// List of constructor and their respective arguments.
List<Pair<Class<?>[], Object[]>> sourceConstructorAndArgs = new
ArrayList<>();
sourceConstructorAndArgs.add(Pair.of(constructorArgsStreamContextMetrics, new
Object[] {cfg, jssc, sparkSession, metrics, streamContext}));
sourceConstructorAndArgs.add(Pair.of(constructorArgsStreamContext, new
Object[] {cfg, jssc, sparkSession, streamContext}));
sourceConstructorAndArgs.add(Pair.of(constructorArgsMetrics, new
Object[] {cfg, jssc, sparkSession, streamContext.getSchemaProvider(),
metrics}));
sourceConstructorAndArgs.add(Pair.of(constructorArgs, new Object[] {cfg,
jssc, sparkSession, streamContext.getSchemaProvider()}));
HoodieException sourceClassLoadException = null;
for (Pair<Class<?>[], Object[]> constructor : sourceConstructorAndArgs) {
try {
return (Source) ReflectionUtils.loadClass(sourceClass,
constructor.getLeft(), constructor.getRight());
} catch (HoodieException e) {
sourceClassLoadException = e;
} catch (Throwable t) {
throw new IOException("Could not load source class " + sourceClass,
t);
}
}
throw new IOException("Could not load source class " + sourceClass,
sourceClassLoadException);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]