gyfora commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1598242464
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ########## @@ -228,6 +230,14 @@ public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { this.timeCharacteristic = timeCharacteristic; } + public void setLineageGraph(LineageGraph lineageGraph) { Review Comment: This method is not called anywhere, how did we test this functionality? ########## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ########## @@ -97,7 +117,26 @@ public CompletableFuture<JobClient> execute( clusterClientProvider, jobID, userCodeClassloader)) - .whenCompleteAsync((ignored1, ignored2) -> clusterClient.close()); + .whenCompleteAsync( + (jobClient, throwable) -> { + if (throwable == null) { + RuntimeExecutionMode executionMode = + jobGraph.getJobConfiguration() + .get(ExecutionOptions.RUNTIME_MODE); + if (jobStatusChangedListeners.size() > 0) { + jobStatusChangedListeners.forEach( + listener -> + listener.onEvent( + new DefaultJobCreatedEvent( + jobGraph.getJobID(), + jobGraph.getName(), + ((StreamGraph) pipeline) + .getLineageGraph(), + executionMode))); + } + } + clusterClient.close(); Review Comment: Should this be in a finally block in case the listeners throws an error? ########## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ########## @@ -55,11 +65,21 @@ public class AbstractSessionClusterExecutor< ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements CacheSupportedPipelineExecutor { + private final ExecutorService executorService = + Executors.newFixedThreadPool( + 4, new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO")); private final ClientFactory clusterClientFactory; + private final Configuration configuration; + private final List<JobStatusChangedListener> jobStatusChangedListeners; - public AbstractSessionClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) { + public AbstractSessionClusterExecutor( + @Nonnull final ClientFactory clusterClientFactory, Configuration configuration) { this.clusterClientFactory = checkNotNull(clusterClientFactory); + this.configuration = configuration; + this.jobStatusChangedListeners = + JobStatusChangedListenerUtils.createJobStatusChangedListeners( + this.getClass().getClassLoader(), configuration, executorService); Review Comment: Should we use the class class loader or the thread context class loader here? ########## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ########## @@ -55,11 +65,21 @@ public class AbstractSessionClusterExecutor< ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements CacheSupportedPipelineExecutor { + private final ExecutorService executorService = + Executors.newFixedThreadPool( + 4, new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO")); Review Comment: Why do we need 4 threads? Do we expect concurrent calls here? ########## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ########## @@ -97,7 +117,26 @@ public CompletableFuture<JobClient> execute( clusterClientProvider, jobID, userCodeClassloader)) - .whenCompleteAsync((ignored1, ignored2) -> clusterClient.close()); + .whenCompleteAsync( + (jobClient, throwable) -> { + if (throwable == null) { + RuntimeExecutionMode executionMode = + jobGraph.getJobConfiguration() + .get(ExecutionOptions.RUNTIME_MODE); + if (jobStatusChangedListeners.size() > 0) { Review Comment: This check has no effects, forEach does nothing if the list is empty -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org