gyfora commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1598242464
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -228,6 +230,14 @@ public void setTimeCharacteristic(TimeCharacteristic
timeCharacteristic) {
this.timeCharacteristic = timeCharacteristic;
}
+ public void setLineageGraph(LineageGraph lineageGraph) {
Review Comment:
This method is not called anywhere, how did we test this functionality?
##########
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##########
@@ -97,7 +117,26 @@ public CompletableFuture<JobClient> execute(
clusterClientProvider,
jobID,
userCodeClassloader))
- .whenCompleteAsync((ignored1, ignored2) ->
clusterClient.close());
+ .whenCompleteAsync(
+ (jobClient, throwable) -> {
+ if (throwable == null) {
+ RuntimeExecutionMode executionMode =
+ jobGraph.getJobConfiguration()
+
.get(ExecutionOptions.RUNTIME_MODE);
+ if (jobStatusChangedListeners.size() > 0) {
+ jobStatusChangedListeners.forEach(
+ listener ->
+ listener.onEvent(
+ new
DefaultJobCreatedEvent(
+
jobGraph.getJobID(),
+
jobGraph.getName(),
+
((StreamGraph) pipeline)
+
.getLineageGraph(),
+
executionMode)));
+ }
+ }
+ clusterClient.close();
Review Comment:
Should this be in a finally block in case the listeners throws an error?
##########
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##########
@@ -55,11 +65,21 @@
public class AbstractSessionClusterExecutor<
ClusterID, ClientFactory extends
ClusterClientFactory<ClusterID>>
implements CacheSupportedPipelineExecutor {
+ private final ExecutorService executorService =
+ Executors.newFixedThreadPool(
+ 4, new
ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));
private final ClientFactory clusterClientFactory;
+ private final Configuration configuration;
+ private final List<JobStatusChangedListener> jobStatusChangedListeners;
- public AbstractSessionClusterExecutor(@Nonnull final ClientFactory
clusterClientFactory) {
+ public AbstractSessionClusterExecutor(
+ @Nonnull final ClientFactory clusterClientFactory, Configuration
configuration) {
this.clusterClientFactory = checkNotNull(clusterClientFactory);
+ this.configuration = configuration;
+ this.jobStatusChangedListeners =
+ JobStatusChangedListenerUtils.createJobStatusChangedListeners(
+ this.getClass().getClassLoader(), configuration,
executorService);
Review Comment:
Should we use the class class loader or the thread context class loader here?
##########
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##########
@@ -55,11 +65,21 @@
public class AbstractSessionClusterExecutor<
ClusterID, ClientFactory extends
ClusterClientFactory<ClusterID>>
implements CacheSupportedPipelineExecutor {
+ private final ExecutorService executorService =
+ Executors.newFixedThreadPool(
+ 4, new
ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));
Review Comment:
Why do we need 4 threads? Do we expect concurrent calls here?
##########
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##########
@@ -97,7 +117,26 @@ public CompletableFuture<JobClient> execute(
clusterClientProvider,
jobID,
userCodeClassloader))
- .whenCompleteAsync((ignored1, ignored2) ->
clusterClient.close());
+ .whenCompleteAsync(
+ (jobClient, throwable) -> {
+ if (throwable == null) {
+ RuntimeExecutionMode executionMode =
+ jobGraph.getJobConfiguration()
+
.get(ExecutionOptions.RUNTIME_MODE);
+ if (jobStatusChangedListeners.size() > 0) {
Review Comment:
This check has no effects, forEach does nothing if the list is empty
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]