This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 26d7c09 [FLINK-26407][end-to-end-tests] Increase timeouts for
MetricsAvailabilityITCase.
26d7c09 is described below
commit 26d7c09b2c505d1f34159e3e3a8a33be823a9f39
Author: David Moravek <[email protected]>
AuthorDate: Tue Mar 1 17:04:51 2022 +0100
[FLINK-26407][end-to-end-tests] Increase timeouts for
MetricsAvailabilityITCase.
---
.../metrics/tests/MetricsAvailabilityITCase.java | 40 ++++++++++++----------
1 file changed, 22 insertions(+), 18 deletions(-)
diff --git
a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
index b2593c8..6be55ead 100644
---
a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
+++
b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
@@ -57,8 +57,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -88,22 +86,23 @@ public class MetricsAvailabilityITCase extends TestLogger {
@Test
public void testReporter() throws Exception {
+ final Deadline deadline = Deadline.fromNow(Duration.ofMinutes(10));
try (ClusterController ignored = dist.startCluster(1)) {
final RestClient restClient =
new RestClient(new Configuration(),
scheduledExecutorService);
- checkJobManagerMetricAvailability(restClient);
+ checkJobManagerMetricAvailability(restClient, deadline);
- final Collection<ResourceID> taskManagerIds =
getTaskManagerIds(restClient);
+ final Collection<ResourceID> taskManagerIds =
getTaskManagerIds(restClient, deadline);
for (final ResourceID taskManagerId : taskManagerIds) {
- checkTaskManagerMetricAvailability(restClient, taskManagerId);
+ checkTaskManagerMetricAvailability(restClient, taskManagerId,
deadline);
}
}
}
- private static void checkJobManagerMetricAvailability(final RestClient
restClient)
- throws Exception {
+ private static void checkJobManagerMetricAvailability(
+ final RestClient restClient, final Deadline deadline) throws
Exception {
final JobManagerMetricsHeaders headers =
JobManagerMetricsHeaders.getInstance();
final JobManagerMetricsMessageParameters parameters =
headers.getUnresolvedMessageParameters();
@@ -114,11 +113,12 @@ public class MetricsAvailabilityITCase extends TestLogger
{
() ->
restClient.sendRequest(
HOST, PORT, headers, parameters,
EmptyRequestBody.getInstance()),
- getMetricNamePredicate("numRegisteredTaskManagers"));
+ getMetricNamePredicate("numRegisteredTaskManagers"),
+ deadline);
}
- private static Collection<ResourceID> getTaskManagerIds(final RestClient
restClient)
- throws Exception {
+ private static Collection<ResourceID> getTaskManagerIds(
+ final RestClient restClient, final Deadline deadline) throws
Exception {
final TaskManagersHeaders headers = TaskManagersHeaders.getInstance();
final TaskManagersInfo response =
@@ -130,7 +130,8 @@ public class MetricsAvailabilityITCase extends TestLogger {
headers,
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance()),
- taskManagersInfo ->
!taskManagersInfo.getTaskManagerInfos().isEmpty());
+ taskManagersInfo ->
!taskManagersInfo.getTaskManagerInfos().isEmpty(),
+ deadline);
return response.getTaskManagerInfos().stream()
.map(TaskManagerInfo::getResourceId)
@@ -138,7 +139,8 @@ public class MetricsAvailabilityITCase extends TestLogger {
}
private static void checkTaskManagerMetricAvailability(
- final RestClient restClient, final ResourceID taskManagerId)
throws Exception {
+ final RestClient restClient, final ResourceID taskManagerId, final
Deadline deadline)
+ throws Exception {
final TaskManagerMetricsHeaders headers =
TaskManagerMetricsHeaders.getInstance();
final TaskManagerMetricsMessageParameters parameters =
headers.getUnresolvedMessageParameters();
@@ -150,13 +152,15 @@ public class MetricsAvailabilityITCase extends TestLogger
{
() ->
restClient.sendRequest(
HOST, PORT, headers, parameters,
EmptyRequestBody.getInstance()),
- getMetricNamePredicate("Status.Network.TotalMemorySegments"));
+ getMetricNamePredicate("Status.Network.TotalMemorySegments"),
+ deadline);
}
private static <X> X fetchMetric(
final SupplierWithException<CompletableFuture<X>, IOException>
clientOperation,
- final Predicate<X> predicate)
- throws InterruptedException, ExecutionException, TimeoutException {
+ final Predicate<X> predicate,
+ final Deadline deadline)
+ throws InterruptedException, ExecutionException {
final CompletableFuture<X> responseFuture =
FutureUtils.retrySuccessfulWithDelay(
() -> {
@@ -166,12 +170,12 @@ public class MetricsAvailabilityITCase extends TestLogger
{
throw new RuntimeException(e);
}
},
- Time.seconds(1),
- Deadline.fromNow(Duration.ofSeconds(5)),
+ Time.milliseconds(100),
+ deadline,
predicate,
new
ScheduledExecutorServiceAdapter(scheduledExecutorService));
- return responseFuture.get(30, TimeUnit.SECONDS);
+ return responseFuture.get();
}
private static Predicate<MetricCollectionResponseBody>
getMetricNamePredicate(