XComp commented on a change in pull request #15030:
URL: https://github.com/apache/flink/pull/15030#discussion_r584608406



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -290,6 +303,85 @@ public void testResourceTimeout() throws Exception {
         assertThat(b, is(true));
     }
 
+    @Test
+    public void testNumRestartsMetric() throws Exception {
+        final CompletableFuture<Gauge<Integer>> numRestartsMetricFuture = new 
CompletableFuture<>();
+        final MetricRegistry metricRegistry =
+                TestingMetricRegistry.builder()
+                        .setRegisterConsumer(
+                                (metric, name, group) -> {
+                                    if (MetricNames.NUM_RESTARTS.equals(name)) 
{
+                                        
numRestartsMetricFuture.complete((Gauge<Integer>) metric);
+                                    }
+                                })
+                        .build();
+
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setJobManagerJobMetricGroup(
+                                new JobManagerJobMetricGroup(
+                                        metricRegistry,
+                                        
createUnregisteredJobManagerMetricGroup(),
+                                        new JobID(),
+                                        "jobName"))
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final Gauge<Integer> numRestartsMetric = numRestartsMetricFuture.get();
+        assertThat(numRestartsMetric.getValue(), is(0));

Review comment:
       I'm wondering whether we should move this assert even further down just 
before the new slot offering triggers a restart. That would make clearer which 
operation is expected to trigger the metric change.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -290,6 +303,85 @@ public void testResourceTimeout() throws Exception {
         assertThat(b, is(true));
     }
 
+    @Test
+    public void testNumRestartsMetric() throws Exception {
+        final CompletableFuture<Gauge<Integer>> numRestartsMetricFuture = new 
CompletableFuture<>();
+        final MetricRegistry metricRegistry =
+                TestingMetricRegistry.builder()
+                        .setRegisterConsumer(
+                                (metric, name, group) -> {
+                                    if (MetricNames.NUM_RESTARTS.equals(name)) 
{
+                                        
numRestartsMetricFuture.complete((Gauge<Integer>) metric);
+                                    }
+                                })
+                        .build();
+
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setJobManagerJobMetricGroup(
+                                new JobManagerJobMetricGroup(
+                                        metricRegistry,
+                                        
createUnregisteredJobManagerMetricGroup(),
+                                        new JobID(),
+                                        "jobName"))
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final Gauge<Integer> numRestartsMetric = numRestartsMetricFuture.get();
+        assertThat(numRestartsMetric.getValue(), is(0));
+
+        scheduler.startScheduling();
+
+        final TaskExecutorGateway taskManagerGateway =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setCancelTaskFunction(
+                                executionAttemptId -> {
+                                    mainThreadExecutor.execute(
+                                            () ->
+                                                    
scheduler.updateTaskExecutionState(
+                                                            new 
TaskExecutionState(
+                                                                    
executionAttemptId,
+                                                                    
ExecutionState.CANCELED)));
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .createTestingTaskExecutorGateway();
+
+        declarativeSlotPool.offerSlots(
+                createSlotOffersForResourceRequirements(
+                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
1)),
+                new LocalTaskManagerLocation(),
+                new RpcTaskManagerGateway(taskManagerGateway, 
JobMasterId.generate()),

Review comment:
       A minor thing: Is there a reason why we use `RpcTaskManagerGateway` 
instead of `SimpleAckingTaskManagerGateway` here? It wouldn't make a big 
difference but would reduce the number of instances created by this class...

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
##########
@@ -166,6 +166,19 @@ public boolean awaitTermination(long timeout, TimeUnit 
unit) {
     // Execution triggering and access to the queued tasks
     // ------------------------------------------------------------------------
 
+    /**
+     * Executes all runnable and scheduled non-periodic tasks until none are 
left to run. This is
+     * essentially a combination of {@link #triggerAll()} and {@link
+     * #triggerNonPeriodicScheduledTasks()} that allows making a test agnostic 
of how exactly a
+     * runnable is passed to the executor.
+     */
+    public void triggerAllNonPeriodicTasks() {

Review comment:
       Just to make me understand: The method is added to cover cases where a 
`Runnable` or non-periodic scheduled task is added while this method is already 
processed? Hence, it has benefits in tests where there is no explicit wait 
before triggering the execution of those tasks?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to