Hisoka-X commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1372653384
##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java:
##########
@@ -64,6 +83,81 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
jobFilePath, idGenerator, jobConfig, commonPluginJars,
isStartWithSavePoint);
}
+ @Override
+ protected LogicalDag getLogicalDag() {
+ ImmutablePair<List<Action>, Set<URL>> immutablePair =
getJobConfigParser().parse();
+ actions.addAll(immutablePair.getLeft());
+ // Enable upload connector jar package to engine server, automatically
upload connector Jar
+ // packages
+ // and dependent third-party Jar packages to the server before job
execution.
+ // Enabling this configuration does not require the server to hold all
connector Jar
+ // packages.
Review Comment:
```suggestion
// Enable upload connector jar package to engine server,
automatically upload connector Jar
// packages and dependent third-party Jar packages to the server
before job execution.
// Enabling this configuration does not require the server to hold
all connector Jar packages.
```
##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java:
##########
@@ -64,6 +83,81 @@ protected MultipleTableJobConfigParser getJobConfigParser() {
jobFilePath, idGenerator, jobConfig, commonPluginJars,
isStartWithSavePoint);
}
+ @Override
+ protected LogicalDag getLogicalDag() {
+ ImmutablePair<List<Action>, Set<URL>> immutablePair =
getJobConfigParser().parse();
+ actions.addAll(immutablePair.getLeft());
+ // Enable upload connector jar package to engine server, automatically
upload connector Jar
+ // packages
+ // and dependent third-party Jar packages to the server before job
execution.
+ // Enabling this configuration does not require the server to hold all
connector Jar
+ // packages.
+ boolean enableUploadConnectorJarPackage =
+
seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();
+ if (enableUploadConnectorJarPackage) {
+ Set<ConnectorJarIdentifier> commonJarIdentifiers =
+ connectorPackageClient.uploadCommonPluginJars(
+
Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);
+ Set<URL> commonPluginJarUrls =
getJarUrlsFromIdentifiers(commonJarIdentifiers);
+ Set<ConnectorJarIdentifier> pluginJarIdentifiers = new HashSet<>();
+ uploadActionPluginJar(actions, pluginJarIdentifiers);
+ Set<URL> connectorPluginJarUrls =
getJarUrlsFromIdentifiers(pluginJarIdentifiers);
+ connectorJarIdentifiers.addAll(commonJarIdentifiers);
+ connectorJarIdentifiers.addAll(pluginJarIdentifiers);
+ jarUrls.addAll(commonPluginJarUrls);
+ jarUrls.addAll(connectorPluginJarUrls);
+ actions.forEach(
+ action -> {
+ addCommonPluginJarsToAction(
+ action, commonPluginJarUrls,
commonJarIdentifiers);
+ });
+ actions.forEach(
+ action -> {
+ org.apache.seatunnel.engine.core.dag.actions.Config
config =
+ action.getConfig();
+ });
Review Comment:
Is this part right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]