TanYuxin-tyx commented on code in PR #21132:
URL: https://github.com/apache/flink/pull/21132#discussion_r1003979797


##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##########
@@ -263,7 +220,151 @@ public void testShortUpdateInterval() throws 
InterruptedException {
 
         fetcher.update();
 
-        assertThat(requestMetricQueryServiceGatewaysCounter.get(), is(2));
+        
assertThat(requestMetricQueryServiceGatewaysCounter.get()).isEqualTo(2);
+    }
+
+    @Test
+    public void testIgnoreUpdateRequestWhenFetchingMetrics() throws 
InterruptedException {
+        final long updateInterval = 1000L;
+        final long waitTimeBeforeReturnMetricResults = updateInterval * 2;
+        final Time timeout = Time.seconds(10L);
+        final AtomicInteger requestMetricQueryServiceGatewaysCounter = new 
AtomicInteger(0);
+        final JobID jobID = new JobID();
+        final ResourceID tmRID = ResourceID.generate();
+
+        // Create metric fetcher
+        MetricFetcher fetcher =
+                createMetricFetcherWithServiceGateways(
+                        jobID,
+                        tmRID,
+                        timeout,
+                        updateInterval,
+                        waitTimeBeforeReturnMetricResults,
+                        requestMetricQueryServiceGatewaysCounter);
+
+        fetcher.update();
+
+        final long start = System.currentTimeMillis();
+        long difference = 0L;
+
+        while (difference <= updateInterval) {
+            Thread.sleep((int) (updateInterval * 1.5f));
+            difference = System.currentTimeMillis() - start;
+        }
+
+        fetcher.update();
+
+        
assertThat(requestMetricQueryServiceGatewaysCounter.get()).isEqualTo(1);
+    }
+
+    @Nonnull
+    private MetricFetcher createMetricFetcherWithServiceGateways(
+            JobID jobID,
+            ResourceID tmRID,
+            Time timeout,
+            long updateInterval,
+            long waitTimeBeforeReturnMetricResults,
+            @Nullable AtomicInteger requestMetricQueryServiceGatewaysCounter) {
+        final ExecutorService executor = 
java.util.concurrent.Executors.newSingleThreadExecutor();
+        // ========= setup QueryServices
+        // 
================================================================================
+
+        final MetricQueryServiceGateway jmQueryService =
+                new TestingMetricQueryServiceGateway.Builder()
+                        .setQueryMetricsSupplier(
+                                () ->
+                                        CompletableFuture.completedFuture(
+                                                new MetricDumpSerialization
+                                                        
.MetricSerializationResult(
+                                                        new byte[0],
+                                                        new byte[0],
+                                                        new byte[0],
+                                                        new byte[0],
+                                                        0,
+                                                        0,
+                                                        0,
+                                                        0)))
+                        .build();
+
+        MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer 
=
+                createRequestDumpAnswer(tmRID, jobID);
+        final MetricQueryServiceGateway tmQueryService =
+                new TestingMetricQueryServiceGateway.Builder()
+                        .setQueryMetricsSupplier(
+                                () -> {
+                                    if (waitTimeBeforeReturnMetricResults > 0) 
{
+                                        CompletableFuture<
+                                                        MetricDumpSerialization
+                                                                
.MetricSerializationResult>
+                                                metricsAnswerFuture = new 
CompletableFuture<>();
+                                        CompletableFuture.completedFuture(null)
+                                                .thenRunAsync(
+                                                        waitTimeMs(
+                                                                
waitTimeBeforeReturnMetricResults),
+                                                        executor)
+                                                .whenCompleteAsync(
+                                                        (ignore, throwable) -> 
{
+                                                            if (throwable != 
null) {
+                                                                
fail(throwable.getMessage());
+                                                            }
+                                                            
metricsAnswerFuture.complete(
+                                                                    
requestMetricsAnswer);
+                                                        });
+                                        return metricsAnswerFuture;
+                                    } else {
+                                        return 
CompletableFuture.completedFuture(
+                                                requestMetricsAnswer);
+                                    }
+                                })
+                        .build();
+
+        // ========= setup JobManager
+        // 
==================================================================================
+
+        final TestingRestfulGateway restfulGateway =
+                new TestingRestfulGateway.Builder()
+                        .setRequestMultipleJobDetailsSupplier(
+                                () ->
+                                        CompletableFuture.completedFuture(
+                                                new 
MultipleJobsDetails(Collections.emptyList())))
+                        .setRequestMetricQueryServiceGatewaysSupplier(
+                                () -> {
+                                    if 
(requestMetricQueryServiceGatewaysCounter != null) {
+                                        
requestMetricQueryServiceGatewaysCounter.incrementAndGet();
+                                    }
+                                    return CompletableFuture.completedFuture(
+                                            
Collections.singleton(jmQueryService.getAddress()));
+                                })
+                        
.setRequestTaskManagerMetricQueryServiceGatewaysSupplier(
+                                () ->
+                                        CompletableFuture.completedFuture(
+                                                Collections.singleton(
+                                                        Tuple2.of(
+                                                                tmRID,
+                                                                
tmQueryService.getAddress()))))
+                        .build();
+
+        final GatewayRetriever<RestfulGateway> retriever =
+                () -> CompletableFuture.completedFuture(restfulGateway);
+
+        // ========= start MetricFetcher testing
+        // 
=======================================================================
+        return new MetricFetcherImpl<>(
+                retriever,
+                address -> CompletableFuture.completedFuture(tmQueryService),
+                Executors.directExecutor(),
+                timeout,
+                updateInterval);
+    }
+
+    private static Runnable waitTimeMs(long sleepTimeMs) {
+        return () -> {
+            try {
+                Thread.sleep(sleepTimeMs);
+            } catch (Throwable throwable) {
+                fail(throwable.getMessage());

Review Comment:
   I have removed the method because only one situation used it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to