XComp commented on a change in pull request #13560:
URL: https://github.com/apache/flink/pull/13560#discussion_r502259949



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
##########
@@ -98,6 +101,42 @@ public JobDetails(
                this.tasksPerState = checkNotNull(tasksPerState);
                this.numTasks = numTasks;
        }
+
+       public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
+               JobStatus status = job.getState();
+
+               long started = job.getStatusTimestamp(JobStatus.INITIALIZING);
+               long finished = status.isGloballyTerminalState() ? 
job.getStatusTimestamp(status) : -1L;
+               long duration = (finished >= 0L ? finished : 
System.currentTimeMillis()) - started;
+
+               int[] countsPerStatus = new int[ExecutionState.values().length];
+               long lastChanged = 0;
+               int numTotalTasks = 0;
+
+               for (AccessExecutionJobVertex ejv : 
job.getVerticesTopologically()) {
+                       AccessExecutionVertex[] vertices = 
ejv.getTaskVertices();

Review comment:
       ```suggestion
                        AccessExecutionVertex[] taskVertices = 
ejv.getTaskVertices();
   ```
   Could we do this renaming to improve the readability since we're dealing 
with different types of vertices in this code segment?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
##########
@@ -98,6 +101,42 @@ public JobDetails(
                this.tasksPerState = checkNotNull(tasksPerState);
                this.numTasks = numTasks;
        }
+
+       public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
+               JobStatus status = job.getState();
+
+               long started = job.getStatusTimestamp(JobStatus.INITIALIZING);
+               long finished = status.isGloballyTerminalState() ? 
job.getStatusTimestamp(status) : -1L;
+               long duration = (finished >= 0L ? finished : 
System.currentTimeMillis()) - started;
+
+               int[] countsPerStatus = new int[ExecutionState.values().length];
+               long lastChanged = 0;
+               int numTotalTasks = 0;
+
+               for (AccessExecutionJobVertex ejv : 
job.getVerticesTopologically()) {
+                       AccessExecutionVertex[] vertices = 
ejv.getTaskVertices();
+                       numTotalTasks += vertices.length;
+
+                       for (AccessExecutionVertex vertex : vertices) {

Review comment:
       ```suggestion
                        for (AccessExecutionVertex taskVertex : taskVertices) {
   ```
   Same here with the `vertex` variable.

##########
File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
##########
@@ -303,6 +303,55 @@ public void testCancel() throws Exception {
                BlockingInvokable.reset();
        }
 
+       /**
+        * See FLINK-19518. This test ensures that the /jobs/overview handler 
shows a duration != 0.
+        *
+        */
+       @Test
+       public void testJobOverviewHandler() throws Exception {
+               // this only works if there is no active job at this point
+               
assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
+
+               // Create a task
+               final JobVertex sender = new JobVertex("Sender");
+               sender.setParallelism(2);
+               sender.setInvokableClass(BlockingInvokable.class);
+
+               final JobGraph jobGraph = new JobGraph("Stoppable streaming 
test job", sender);
+               final JobID jid = jobGraph.getJobID();
+
+               ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
+               clusterClient.submitJob(jobGraph).get();
+
+               // wait for job to show up
+               while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
+                       Thread.sleep(10);
+               }
+
+               // wait for tasks to be properly running
+               BlockingInvokable.latch.await();
+
+               final Duration testTimeout = Duration.ofMinutes(2);
+               final LocalTime deadline = LocalTime.now().plus(testTimeout);

Review comment:
       The `deadline` variable is never used. Are we missing an `assert` in 
this test or is this variable obsolete?

##########
File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
##########
@@ -303,6 +303,55 @@ public void testCancel() throws Exception {
                BlockingInvokable.reset();
        }
 
+       /**
+        * See FLINK-19518. This test ensures that the /jobs/overview handler 
shows a duration != 0.
+        *

Review comment:
       I know it's a minor thing, but the extra line is not necessary here.

##########
File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
##########
@@ -303,6 +303,55 @@ public void testCancel() throws Exception {
                BlockingInvokable.reset();
        }
 
+       /**
+        * See FLINK-19518. This test ensures that the /jobs/overview handler 
shows a duration != 0.
+        *
+        */
+       @Test
+       public void testJobOverviewHandler() throws Exception {
+               // this only works if there is no active job at this point
+               
assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
+
+               // Create a task
+               final JobVertex sender = new JobVertex("Sender");
+               sender.setParallelism(2);
+               sender.setInvokableClass(BlockingInvokable.class);
+
+               final JobGraph jobGraph = new JobGraph("Stoppable streaming 
test job", sender);
+               final JobID jid = jobGraph.getJobID();

Review comment:
       `jid` is never used and can be removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to