zentol commented on a change in pull request #17735:
URL: https://github.com/apache/flink/pull/17735#discussion_r748324856



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
##########
@@ -486,6 +492,101 @@ public void testNumRestartsMetric() throws Exception {
         assertThat(numRestartsMetric.getValue(), is(1));
     }
 
+    @Test
+    public void testStatusMetrics() throws Exception {
+        final CompletableFuture<UpTimeGauge> upTimeMetricFuture = new 
CompletableFuture<>();
+        final CompletableFuture<DownTimeGauge> downTimeMetricFuture = new 
CompletableFuture<>();
+        final CompletableFuture<RestartTimeGauge> restartTimeMetricFuture =
+                new CompletableFuture<>();
+        final MetricRegistry metricRegistry =
+                TestingMetricRegistry.builder()
+                        .setRegisterConsumer(
+                                (metric, name, group) -> {
+                                    switch (name) {
+                                        case UpTimeGauge.METRIC_NAME:
+                                            
upTimeMetricFuture.complete((UpTimeGauge) metric);
+                                            break;
+                                        case DownTimeGauge.METRIC_NAME:
+                                            
downTimeMetricFuture.complete((DownTimeGauge) metric);
+                                            break;
+                                        case RestartTimeGauge.METRIC_NAME:
+                                            restartTimeMetricFuture.complete(
+                                                    (RestartTimeGauge) metric);
+                                            break;
+                                    }
+                                })
+                        .build();
+
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(10L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setJobManagerJobMetricGroup(
+                                
JobManagerMetricGroup.createJobManagerMetricGroup(
+                                                metricRegistry, "localhost")
+                                        .addJob(new JobID(), "jobName"))
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final UpTimeGauge upTimeGauge = upTimeMetricFuture.get();
+        final DownTimeGauge downTimeGauge = downTimeMetricFuture.get();
+        final RestartTimeGauge restartTimeGauge = 
restartTimeMetricFuture.get();
+
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM);
+
+        taskManagerGateway.setCancelConsumer(createCancelConsumer(scheduler));
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+                            taskManagerGateway);
+                });
+
+        // wait for the first task submission
+        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
+
+        // sleep a bit to ensure uptime is > 0
+        Thread.sleep(10L);
+
+        assertThat(upTimeGauge.getValue(), greaterThan(0L));
+        assertThat(downTimeGauge.getValue(), is(0L));
+        assertThat(restartTimeGauge.getValue(), is(0L));
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    // offer more slots, which will cause a restart in order 
to scale up
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+                            taskManagerGateway);
+                });
+
+        // wait for the second task submissions
+        taskManagerGateway.waitForSubmissions(2, Duration.ofSeconds(5));
+
+        // sleep a bit to ensure uptime is > 0

Review comment:
       I generally like doing that, but I'm wondering if this would work 
properly for the AdaptiveScheduler in that truly all time-measurements go 
through the clock. For smaller self-contained components it is easy to ensure 
that, but this isn't the case here because we re-use some parts of the 
SchedulerBase/DefaultScheduler, there are multiple state classes, then 
internally there is the EG, ....
   It would be a bit unsatisfactory to introduce a clock but only use it in one 
place :/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to