zhuzhurk commented on code in PR #21695:
URL: https://github.com/apache/flink/pull/21695#discussion_r1082948306


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java:
##########
@@ -200,6 +204,122 @@ void testFinishedTaskExceedRatioInDynamicGraph() throws 
Exception {
         assertThat(slowTasks).hasSize(2);
     }
 
+    @Test
+    void testBalancedInput() throws Exception {
+        final int parallelism = 3;
+        final JobVertex jobVertex1 = createNoOpVertex(parallelism);
+        final JobVertex jobVertex2 = createNoOpVertex(parallelism);
+        jobVertex2.connectNewDataSetAsInput(
+                jobVertex1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobVertex1, 
jobVertex2);
+        final ExecutionTimeBasedSlowTaskDetector slowTaskDetector =
+                createSlowTaskDetector(0.3, 1, 0);
+
+        final ExecutionVertex ev21 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0];
+        ev21.setInputBytes(1024);
+        final ExecutionVertex ev22 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[1];
+        ev22.setInputBytes(1024);
+        final ExecutionVertex ev23 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2];
+        ev23.setInputBytes(1024);
+
+        ev23.getCurrentExecutionAttempt().markFinished();
+        Thread.sleep(10);
+
+        final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks 
=
+                slowTaskDetector.findSlowTasks(executionGraph);
+
+        assertThat(slowTasks).hasSize(2);
+    }
+
+    @Test
+    void testUnbalancedInput() throws Exception {
+        final int parallelism = 3;
+        final JobVertex jobVertex1 = createNoOpVertex(parallelism);
+        final JobVertex jobVertex2 = createNoOpVertex(parallelism);
+        jobVertex2.connectNewDataSetAsInput(
+                jobVertex1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobVertex1, 
jobVertex2);
+        final ExecutionTimeBasedSlowTaskDetector slowTaskDetector =
+                createSlowTaskDetector(0.3, 1, 0);
+
+        final ExecutionVertex ev21 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0];
+        ev21.setInputBytes(1024);
+        final ExecutionVertex ev22 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[1];
+        ev22.setInputBytes(1_024_000);
+        final ExecutionVertex ev23 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2];
+        ev23.setInputBytes(4_096_000);
+
+        ev21.getCurrentExecutionAttempt().markFinished();
+        Thread.sleep(10);
+
+        final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks 
=
+                slowTaskDetector.findSlowTasks(executionGraph);
+
+        // no task will be detected as slow task
+        assertThat(slowTasks).hasSize(0);
+    }
+
+    @Test
+    void testSortedExecutionTimeWithInputBytes() {
+        // executions with unbalanced input bytes
+        ExecutionTimeWithInputBytes executionTimeWithInputBytes1 =
+                new ExecutionTimeWithInputBytes(10, 10);
+        ExecutionTimeWithInputBytes executionTimeWithInputBytes2 =
+                new ExecutionTimeWithInputBytes(10, 20);
+
+        List<ExecutionTimeWithInputBytes> pairList = new ArrayList<>();
+        pairList.add(executionTimeWithInputBytes1);
+        pairList.add(executionTimeWithInputBytes2);
+
+        List<ExecutionTimeWithInputBytes> sortedList =
+                pairList.stream().sorted().collect(Collectors.toList());
+
+        assertThat(sortedList.get(0)).isEqualTo(executionTimeWithInputBytes2);
+
+        // executions with balanced input bytes
+        ExecutionTimeWithInputBytes executionTimeWithInputBytes3 =
+                new ExecutionTimeWithInputBytes(20, 10);
+        ExecutionTimeWithInputBytes executionTimeWithInputBytes4 =
+                new ExecutionTimeWithInputBytes(10, 10);
+
+        pairList.clear();
+        pairList.add(executionTimeWithInputBytes3);
+        pairList.add(executionTimeWithInputBytes4);
+
+        sortedList = pairList.stream().sorted().collect(Collectors.toList());
+
+        assertThat(sortedList.get(0)).isEqualTo(executionTimeWithInputBytes4);
+
+        // executions with UNKNOWN input bytes
+        ExecutionTimeWithInputBytes executionTimeWithInputBytes5 =
+                new ExecutionTimeWithInputBytes(20, -1);
+        ExecutionTimeWithInputBytes executionTimeWithInputBytes6 =
+                new ExecutionTimeWithInputBytes(10, -1);
+
+        pairList.clear();
+        pairList.add(executionTimeWithInputBytes5);
+        pairList.add(executionTimeWithInputBytes6);
+
+        sortedList = pairList.stream().sorted().collect(Collectors.toList());
+
+        assertThat(sortedList.get(0)).isEqualTo(executionTimeWithInputBytes6);
+
+        // executions with assigned input bytes mixed with UNKNOWN input bytes

Review Comment:
   This is an unexpected case and exceptions should be thrown.
   We also need to test 0 input bytes, including 0 and a positive execution 
time.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java:
##########
@@ -200,6 +204,122 @@ void testFinishedTaskExceedRatioInDynamicGraph() throws 
Exception {
         assertThat(slowTasks).hasSize(2);
     }
 
+    @Test
+    void testBalancedInput() throws Exception {
+        final int parallelism = 3;
+        final JobVertex jobVertex1 = createNoOpVertex(parallelism);
+        final JobVertex jobVertex2 = createNoOpVertex(parallelism);
+        jobVertex2.connectNewDataSetAsInput(
+                jobVertex1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobVertex1, 
jobVertex2);
+        final ExecutionTimeBasedSlowTaskDetector slowTaskDetector =
+                createSlowTaskDetector(0.3, 1, 0);
+
+        final ExecutionVertex ev21 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0];
+        ev21.setInputBytes(1024);
+        final ExecutionVertex ev22 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[1];
+        ev22.setInputBytes(1024);
+        final ExecutionVertex ev23 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2];
+        ev23.setInputBytes(1024);
+
+        ev23.getCurrentExecutionAttempt().markFinished();
+        Thread.sleep(10);

Review Comment:
   This sleep is not needed because ev22/ev23 are surely to have a longer 
execution time.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java:
##########
@@ -200,6 +204,122 @@ void testFinishedTaskExceedRatioInDynamicGraph() throws 
Exception {
         assertThat(slowTasks).hasSize(2);
     }
 
+    @Test
+    void testBalancedInput() throws Exception {
+        final int parallelism = 3;
+        final JobVertex jobVertex1 = createNoOpVertex(parallelism);
+        final JobVertex jobVertex2 = createNoOpVertex(parallelism);
+        jobVertex2.connectNewDataSetAsInput(
+                jobVertex1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobVertex1, 
jobVertex2);
+        final ExecutionTimeBasedSlowTaskDetector slowTaskDetector =
+                createSlowTaskDetector(0.3, 1, 0);
+
+        final ExecutionVertex ev21 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0];
+        ev21.setInputBytes(1024);
+        final ExecutionVertex ev22 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[1];
+        ev22.setInputBytes(1024);
+        final ExecutionVertex ev23 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2];
+        ev23.setInputBytes(1024);
+
+        ev23.getCurrentExecutionAttempt().markFinished();
+        Thread.sleep(10);
+
+        final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks 
=
+                slowTaskDetector.findSlowTasks(executionGraph);
+
+        assertThat(slowTasks).hasSize(2);
+    }
+
+    @Test
+    void testUnbalancedInput() throws Exception {
+        final int parallelism = 3;
+        final JobVertex jobVertex1 = createNoOpVertex(parallelism);
+        final JobVertex jobVertex2 = createNoOpVertex(parallelism);
+        jobVertex2.connectNewDataSetAsInput(
+                jobVertex1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
+        final ExecutionGraph executionGraph = createExecutionGraph(jobVertex1, 
jobVertex2);
+        final ExecutionTimeBasedSlowTaskDetector slowTaskDetector =
+                createSlowTaskDetector(0.3, 1, 0);
+
+        final ExecutionVertex ev21 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0];
+        ev21.setInputBytes(1024);
+        final ExecutionVertex ev22 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[1];
+        ev22.setInputBytes(1_024_000);
+        final ExecutionVertex ev23 =
+                
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2];
+        ev23.setInputBytes(4_096_000);
+
+        ev21.getCurrentExecutionAttempt().markFinished();
+        Thread.sleep(10);

Review Comment:
   This case is unstable, because if on a very slow test machine and when 
`findSlowTasks(...)` is executed and several seconds has passed(this sometimes 
happens), ev22/ev23 are possible to be recognized as slow tasks. One safer way 
is to execute this sleep  before the `markFinished()` and increase the sleep 
time to 1000ms. It's usually impossible to take over 1000s before executing the 
`findSlowTasks`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to