This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 26d7c09  [FLINK-26407][end-to-end-tests] Increase timeouts for 
MetricsAvailabilityITCase.
26d7c09 is described below

commit 26d7c09b2c505d1f34159e3e3a8a33be823a9f39
Author: David Moravek <[email protected]>
AuthorDate: Tue Mar 1 17:04:51 2022 +0100

    [FLINK-26407][end-to-end-tests] Increase timeouts for 
MetricsAvailabilityITCase.
---
 .../metrics/tests/MetricsAvailabilityITCase.java   | 40 ++++++++++++----------
 1 file changed, 22 insertions(+), 18 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
 
b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
index b2593c8..6be55ead 100644
--- 
a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
+++ 
b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
@@ -57,8 +57,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -88,22 +86,23 @@ public class MetricsAvailabilityITCase extends TestLogger {
 
     @Test
     public void testReporter() throws Exception {
+        final Deadline deadline = Deadline.fromNow(Duration.ofMinutes(10));
         try (ClusterController ignored = dist.startCluster(1)) {
             final RestClient restClient =
                     new RestClient(new Configuration(), 
scheduledExecutorService);
 
-            checkJobManagerMetricAvailability(restClient);
+            checkJobManagerMetricAvailability(restClient, deadline);
 
-            final Collection<ResourceID> taskManagerIds = 
getTaskManagerIds(restClient);
+            final Collection<ResourceID> taskManagerIds = 
getTaskManagerIds(restClient, deadline);
 
             for (final ResourceID taskManagerId : taskManagerIds) {
-                checkTaskManagerMetricAvailability(restClient, taskManagerId);
+                checkTaskManagerMetricAvailability(restClient, taskManagerId, 
deadline);
             }
         }
     }
 
-    private static void checkJobManagerMetricAvailability(final RestClient 
restClient)
-            throws Exception {
+    private static void checkJobManagerMetricAvailability(
+            final RestClient restClient, final Deadline deadline) throws 
Exception {
         final JobManagerMetricsHeaders headers = 
JobManagerMetricsHeaders.getInstance();
         final JobManagerMetricsMessageParameters parameters =
                 headers.getUnresolvedMessageParameters();
@@ -114,11 +113,12 @@ public class MetricsAvailabilityITCase extends TestLogger 
{
                 () ->
                         restClient.sendRequest(
                                 HOST, PORT, headers, parameters, 
EmptyRequestBody.getInstance()),
-                getMetricNamePredicate("numRegisteredTaskManagers"));
+                getMetricNamePredicate("numRegisteredTaskManagers"),
+                deadline);
     }
 
-    private static Collection<ResourceID> getTaskManagerIds(final RestClient 
restClient)
-            throws Exception {
+    private static Collection<ResourceID> getTaskManagerIds(
+            final RestClient restClient, final Deadline deadline) throws 
Exception {
         final TaskManagersHeaders headers = TaskManagersHeaders.getInstance();
 
         final TaskManagersInfo response =
@@ -130,7 +130,8 @@ public class MetricsAvailabilityITCase extends TestLogger {
                                         headers,
                                         EmptyMessageParameters.getInstance(),
                                         EmptyRequestBody.getInstance()),
-                        taskManagersInfo -> 
!taskManagersInfo.getTaskManagerInfos().isEmpty());
+                        taskManagersInfo -> 
!taskManagersInfo.getTaskManagerInfos().isEmpty(),
+                        deadline);
 
         return response.getTaskManagerInfos().stream()
                 .map(TaskManagerInfo::getResourceId)
@@ -138,7 +139,8 @@ public class MetricsAvailabilityITCase extends TestLogger {
     }
 
     private static void checkTaskManagerMetricAvailability(
-            final RestClient restClient, final ResourceID taskManagerId) 
throws Exception {
+            final RestClient restClient, final ResourceID taskManagerId, final 
Deadline deadline)
+            throws Exception {
         final TaskManagerMetricsHeaders headers = 
TaskManagerMetricsHeaders.getInstance();
         final TaskManagerMetricsMessageParameters parameters =
                 headers.getUnresolvedMessageParameters();
@@ -150,13 +152,15 @@ public class MetricsAvailabilityITCase extends TestLogger 
{
                 () ->
                         restClient.sendRequest(
                                 HOST, PORT, headers, parameters, 
EmptyRequestBody.getInstance()),
-                getMetricNamePredicate("Status.Network.TotalMemorySegments"));
+                getMetricNamePredicate("Status.Network.TotalMemorySegments"),
+                deadline);
     }
 
     private static <X> X fetchMetric(
             final SupplierWithException<CompletableFuture<X>, IOException> 
clientOperation,
-            final Predicate<X> predicate)
-            throws InterruptedException, ExecutionException, TimeoutException {
+            final Predicate<X> predicate,
+            final Deadline deadline)
+            throws InterruptedException, ExecutionException {
         final CompletableFuture<X> responseFuture =
                 FutureUtils.retrySuccessfulWithDelay(
                         () -> {
@@ -166,12 +170,12 @@ public class MetricsAvailabilityITCase extends TestLogger 
{
                                 throw new RuntimeException(e);
                             }
                         },
-                        Time.seconds(1),
-                        Deadline.fromNow(Duration.ofSeconds(5)),
+                        Time.milliseconds(100),
+                        deadline,
                         predicate,
                         new 
ScheduledExecutorServiceAdapter(scheduledExecutorService));
 
-        return responseFuture.get(30, TimeUnit.SECONDS);
+        return responseFuture.get();
     }
 
     private static Predicate<MetricCollectionResponseBody> 
getMetricNamePredicate(

Reply via email to