gyfora commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1598242464


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -228,6 +230,14 @@ public void setTimeCharacteristic(TimeCharacteristic 
timeCharacteristic) {
         this.timeCharacteristic = timeCharacteristic;
     }
 
+    public void setLineageGraph(LineageGraph lineageGraph) {

Review Comment:
   This method is not called anywhere, how did we test this functionality?



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##########
@@ -97,7 +117,26 @@ public CompletableFuture<JobClient> execute(
                                                     clusterClientProvider,
                                                     jobID,
                                                     userCodeClassloader))
-                    .whenCompleteAsync((ignored1, ignored2) -> 
clusterClient.close());
+                    .whenCompleteAsync(
+                            (jobClient, throwable) -> {
+                                if (throwable == null) {
+                                    RuntimeExecutionMode executionMode =
+                                            jobGraph.getJobConfiguration()
+                                                    
.get(ExecutionOptions.RUNTIME_MODE);
+                                    if (jobStatusChangedListeners.size() > 0) {
+                                        jobStatusChangedListeners.forEach(
+                                                listener ->
+                                                        listener.onEvent(
+                                                                new 
DefaultJobCreatedEvent(
+                                                                        
jobGraph.getJobID(),
+                                                                        
jobGraph.getName(),
+                                                                        
((StreamGraph) pipeline)
+                                                                               
 .getLineageGraph(),
+                                                                        
executionMode)));
+                                    }
+                                }
+                                clusterClient.close();

Review Comment:
   Should this be in a finally block in case the listeners throws an error?



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##########
@@ -55,11 +65,21 @@
 public class AbstractSessionClusterExecutor<
                 ClusterID, ClientFactory extends 
ClusterClientFactory<ClusterID>>
         implements CacheSupportedPipelineExecutor {
+    private final ExecutorService executorService =
+            Executors.newFixedThreadPool(
+                    4, new 
ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));
 
     private final ClientFactory clusterClientFactory;
+    private final Configuration configuration;
+    private final List<JobStatusChangedListener> jobStatusChangedListeners;
 
-    public AbstractSessionClusterExecutor(@Nonnull final ClientFactory 
clusterClientFactory) {
+    public AbstractSessionClusterExecutor(
+            @Nonnull final ClientFactory clusterClientFactory, Configuration 
configuration) {
         this.clusterClientFactory = checkNotNull(clusterClientFactory);
+        this.configuration = configuration;
+        this.jobStatusChangedListeners =
+                JobStatusChangedListenerUtils.createJobStatusChangedListeners(
+                        this.getClass().getClassLoader(), configuration, 
executorService);

Review Comment:
   Should we use the class class loader or the thread context class loader here?



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##########
@@ -55,11 +65,21 @@
 public class AbstractSessionClusterExecutor<
                 ClusterID, ClientFactory extends 
ClusterClientFactory<ClusterID>>
         implements CacheSupportedPipelineExecutor {
+    private final ExecutorService executorService =
+            Executors.newFixedThreadPool(
+                    4, new 
ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));

Review Comment:
   Why do we need 4 threads? Do we expect concurrent calls here?



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##########
@@ -97,7 +117,26 @@ public CompletableFuture<JobClient> execute(
                                                     clusterClientProvider,
                                                     jobID,
                                                     userCodeClassloader))
-                    .whenCompleteAsync((ignored1, ignored2) -> 
clusterClient.close());
+                    .whenCompleteAsync(
+                            (jobClient, throwable) -> {
+                                if (throwable == null) {
+                                    RuntimeExecutionMode executionMode =
+                                            jobGraph.getJobConfiguration()
+                                                    
.get(ExecutionOptions.RUNTIME_MODE);
+                                    if (jobStatusChangedListeners.size() > 0) {

Review Comment:
   This check has no effects, forEach does nothing if the list is empty



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to