Hisoka-X commented on code in PR #8536:
URL: https://github.com/apache/seatunnel/pull/8536#discussion_r1921855247
##########
seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java:
##########
@@ -163,87 +75,6 @@ public static Optional<? extends Factory> createSinkFactory(
}
}
- public static SeaTunnelSink createSink(
- Optional<? extends Factory> factory,
- Config sinkConfig,
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
- JobContext jobContext,
- List<CatalogTable> catalogTables,
- ClassLoader classLoader) {
- boolean fallBack = !factory.isPresent() || isFallback(factory.get());
- if (fallBack) {
- SeaTunnelSink sink =
- fallbackCreateSink(
- sinkPluginDiscovery,
- PluginIdentifier.of(
- ENGINE_TYPE,
- PluginType.SINK.getType(),
- sinkConfig.getString(PLUGIN_NAME.key())),
- sinkConfig);
- sink.setJobContext(jobContext);
- sink.setTypeInfo(catalogTables.get(0).getSeaTunnelRowType());
- return sink;
- } else {
- if (catalogTables.size() > 1) {
- Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
- ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(sinkConfig);
- catalogTables.forEach(
- catalogTable -> {
- TableSinkFactoryContext context =
-
TableSinkFactoryContext.replacePlaceholderAndCreate(
- catalogTable,
-
ReadonlyConfig.fromConfig(sinkConfig),
- classLoader,
- ((TableSinkFactory) factory.get())
-
.excludeTablePlaceholderReplaceKeys());
- ConfigValidator.of(context.getOptions())
- .validate(factory.get().optionRule());
- SeaTunnelSink action =
- ((TableSinkFactory) factory.get())
- .createSink(context)
- .createSink();
- action.setJobContext(jobContext);
- sinks.put(catalogTable.getTablePath(), action);
- });
- return FactoryUtil.createMultiTableSink(sinks, readonlyConfig,
classLoader);
- }
- TableSinkFactoryContext context =
- TableSinkFactoryContext.replacePlaceholderAndCreate(
- catalogTables.get(0),
- ReadonlyConfig.fromConfig(sinkConfig),
- classLoader,
- ((TableSinkFactory) factory.get())
- .excludeTablePlaceholderReplaceKeys());
-
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
- SeaTunnelSink sink =
- ((TableSinkFactory)
factory.get()).createSink(context).createSink();
- sink.setJobContext(jobContext);
- return sink;
- }
- }
-
- public static boolean isFallback(Factory factory) {
- try {
- ((TableSinkFactory) factory).createSink(null);
- } catch (Exception e) {
- if (e instanceof UnsupportedOperationException
- && "The Factory has not been implemented and the
deprecated Plugin will be used."
- .equals(e.getMessage())) {
- return true;
- }
- }
- return false;
- }
-
- public static SeaTunnelSink fallbackCreateSink(
- SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
- PluginIdentifier pluginIdentifier,
- Config pluginConfig) {
- SeaTunnelSink source =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
- source.prepare(pluginConfig);
- return source;
- }
-
public static void ensureJobModeMatch(JobContext jobContext,
SeaTunnelSource source) {
Review Comment:
missed invoke this method
##########
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java:
##########
@@ -65,31 +73,53 @@ public final class FactoryUtil {
public static <T, SplitT extends SourceSplit, StateT extends Serializable>
Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>>
createAndPrepareSource(
- ReadonlyConfig options, ClassLoader classLoader, String
factoryIdentifier) {
- return restoreAndPrepareSource(options, classLoader,
factoryIdentifier, null);
+ ReadonlyConfig options,
+ ClassLoader classLoader,
+ String factoryIdentifier,
+ Function<PluginIdentifier, SeaTunnelSource>
createSourceFunction) {
Review Comment:
```suggestion
Function<PluginIdentifier, SeaTunnelSource>
fallbackCreateSource) {
```
##########
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java:
##########
@@ -150,7 +181,8 @@ SeaTunnelSink<IN, StateT, CommitInfoT,
AggregatedCommitInfoT> createAndPrepareSi
CatalogTable catalogTable,
ReadonlyConfig config,
ClassLoader classLoader,
- String factoryIdentifier) {
+ String factoryIdentifier,
+ Function<PluginIdentifier, SeaTunnelSink>
createSinkFunction) {
Review Comment:
```suggestion
Function<PluginIdentifier, SeaTunnelSink>
fallbackCreateSink) {
```
##########
seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java:
##########
@@ -75,12 +76,24 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
* Add jar url to classloader. The different engine should have different
logic to add url into
* their own classloader
*/
- private static final BiConsumer<ClassLoader, URL>
DEFAULT_URL_TO_CLASSLOADER =
+ protected static final BiConsumer<ClassLoader, URL>
DEFAULT_URL_TO_CLASSLOADER =
(classLoader, url) -> {
- if (classLoader instanceof URLClassLoader) {
- ReflectionUtils.invoke(classLoader, "addURL", url);
- } else {
- throw new UnsupportedOperationException("can't support
custom load jar");
+ try {
+ if
(classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
Review Comment:
It only worked for flink, we should not put `SafetyNetWrapperClassLoader`
check at here.
##########
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java:
##########
@@ -351,4 +401,29 @@ public static SeaTunnelTransform<?>
createAndPrepareMultiTableTransform(
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
return factory.createTransform(context).createTransform();
}
+
+ private static <T extends Factory> boolean isFallback(
+ ClassLoader classLoader,
+ Class<T> factoryClass,
+ String factoryId,
+ Consumer<T> virtualCreator) {
+ Optional<T> factory =
+ FactoryUtil.discoverOptionalFactory(classLoader, factoryClass,
factoryId);
+ if (!factory.isPresent()) {
+ return true;
+ }
+ try {
+ virtualCreator.accept(factory.get());
+ } catch (Exception e) {
+ if (e instanceof UnsupportedOperationException
+ && "The Factory has not been implemented and the
deprecated Plugin will be used."
+ .equals(e.getMessage())) {
+ log.warn(
+ "The Factory has not been implemented and the
deprecated Plugin will be used.");
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]