wanglijie95 commented on code in PR #20321:
URL: https://github.com/apache/flink/pull/20321#discussion_r926376468


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -149,6 +159,16 @@ public SpeculativeScheduler(
         this.blocklistOperations = checkNotNull(blocklistOperations);
 
         this.slowTaskDetector = new 
ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration);
+
+        this.numEffectiveSpeculativeExecutionsCounter = new SimpleCounter();
+        this.registerMetrics(jobManagerJobMetricGroup);

Review Comment:
   I think it would be better to move the registration to 
`startSchedulingInternal`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
@@ -128,6 +128,12 @@
     /** The unique ID marking the specific execution instant of the task. */
     private final ExecutionAttemptID attemptId;
 
+    /**
+     * This field indicates whether the execution is the original execution of 
an execution vertex,
+     * i.e. it is created along with the creation of resetting of the 
execution vertex.
+     */
+    private final boolean original;

Review Comment:
   Maybe we can add a `originalAttemptNumber` in `SpeculativeExecutionVertex` 
to record the original execution? And then we don't need to change the  logic 
in `Execution` and `ExecutionVertex`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java:
##########
@@ -85,6 +91,10 @@ public class SpeculativeScheduler extends 
AdaptiveBatchScheduler
 
     private final SlowTaskDetector slowTaskDetector;
 
+    private long numSlowExecutionVertices;

Review Comment:
   How about change the name to `currentNumSlowExecutionVertices`? I think the 
`numXXX` generally represents an accumulated value (like 
`numEffectiveSpeculativeExecutions`).



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java:
##########
@@ -385,6 +385,60 @@ public void 
testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exce
         assertThat(sinkExecutionVertex.getCurrentExecutions()).hasSize(2);
     }
 
+    @Test
+    public void testNumSlowExecutionVerticesMetric() {

Review Comment:
   public is not needed



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java:
##########
@@ -385,6 +385,60 @@ public void 
testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exce
         assertThat(sinkExecutionVertex.getCurrentExecutions()).hasSize(2);
     }
 
+    @Test
+    public void testNumSlowExecutionVerticesMetric() {

Review Comment:
    `testSpeculativeExecutionCombinedWithAdaptiveScheduling` has the same 
problem



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java:
##########
@@ -385,6 +385,60 @@ public void 
testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exce
         assertThat(sinkExecutionVertex.getCurrentExecutions()).hasSize(2);
     }
 
+    @Test
+    public void testNumSlowExecutionVerticesMetric() {
+        final SpeculativeScheduler scheduler = 
createSchedulerAndStartScheduling();
+        final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
+        final Execution attempt1 = ev.getCurrentExecutionAttempt();
+
+        notifySlowTask(scheduler, attempt1);
+        assertThat(scheduler.getNumSlowExecutionVertices()).isEqualTo(1);
+
+        // notify a slow vertex twice
+        notifySlowTask(scheduler, attempt1);
+        assertThat(scheduler.getNumSlowExecutionVertices()).isEqualTo(1);
+
+        // vertex no longer slow
+        scheduler.notifySlowTasks(Collections.emptyMap());
+        assertThat(scheduler.getNumSlowExecutionVertices()).isZero();
+    }
+
+    @Test
+    public void testEffectiveSpeculativeExecutionsMetric() {

Review Comment:
   public is not needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to