zentol closed pull request #7056: [FLINK-10634][metrics][rest] Add metrics availability e2e test URL: https://github.com/apache/flink/pull/7056
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/flink-metrics-availability-test/pom.xml b/flink-end-to-end-tests/flink-metrics-availability-test/pom.xml new file mode 100644 index 00000000000..eea4ef8458d --- /dev/null +++ b/flink-end-to-end-tests/flink-metrics-availability-test/pom.xml @@ -0,0 +1,93 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.8-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-metrics-availability-test</artifactId> + <version>1.8-SNAPSHOT</version> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-end-to-end-tests-common</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-junit</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>e2e-metric-availability</id> + <activation> + <property> + <name>e2e-metrics</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <execution> + <id>e2e-metric-availability</id> + <phase>integration-test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <includes> + <include>**/*ITCase.*</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> diff --git a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java new file mode 100644 index 00000000000..28a21354e5d --- /dev/null +++ b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.pache.flink.metrics.tests; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.RestClientConfiguration; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo; +import org.apache.flink.tests.util.FlinkDistribution; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * End-to-end test for the availability of metrics. + */ +public class MetricsAvailabilityITCase extends TestLogger { + + private static final String HOST = "localhost"; + private static final int PORT = 8081; + + @Rule + public final FlinkDistribution dist = new FlinkDistribution(); + + @Nullable + private static ScheduledExecutorService scheduledExecutorService = null; + + @BeforeClass + public static void startExecutor() { + scheduledExecutorService = Executors.newScheduledThreadPool(4); + } + + @AfterClass + public static void shutdownExecutor() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + } + + @Test + public void testReporter() throws Exception { + dist.startFlinkCluster(); + + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), scheduledExecutorService); + + checkJobManagerMetricAvailability(restClient); + + final Collection<ResourceID> taskManagerIds = getTaskManagerIds(restClient); + + for (final ResourceID taskManagerId : taskManagerIds) { + checkTaskManagerMetricAvailability(restClient, taskManagerId); + } + } + + private static void checkJobManagerMetricAvailability(final RestClient restClient) throws Exception { + final JobManagerMetricsHeaders headers = JobManagerMetricsHeaders.getInstance(); + final JobManagerMetricsMessageParameters parameters = headers.getUnresolvedMessageParameters(); + parameters.metricsFilterParameter.resolve(Collections.singletonList("numRegisteredTaskManagers")); + + fetchMetric(() -> + restClient.sendRequest(HOST, PORT, headers, parameters, EmptyRequestBody.getInstance()), + getMetricNamePredicate("numRegisteredTaskManagers")); + } + + private static Collection<ResourceID> getTaskManagerIds(final RestClient restClient) throws Exception { + final TaskManagersHeaders headers = TaskManagersHeaders.getInstance(); + + final TaskManagersInfo response = fetchMetric(() -> + restClient.sendRequest( + HOST, + PORT, + headers, + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance()), + taskManagersInfo -> !taskManagersInfo.getTaskManagerInfos().isEmpty()); + + return response.getTaskManagerInfos().stream() + .map(TaskManagerInfo::getResourceId) + .collect(Collectors.toList()); + } + + private static void checkTaskManagerMetricAvailability(final RestClient restClient, final ResourceID taskManagerId) throws Exception { + final TaskManagerMetricsHeaders headers = TaskManagerMetricsHeaders.getInstance(); + final TaskManagerMetricsMessageParameters parameters = headers.getUnresolvedMessageParameters(); + parameters.taskManagerIdParameter.resolve(taskManagerId); + parameters.metricsFilterParameter.resolve(Collections.singletonList("Status.Network.TotalMemorySegments")); + + fetchMetric(() -> + restClient.sendRequest(HOST, PORT, headers, parameters, EmptyRequestBody.getInstance()), + getMetricNamePredicate("Status.Network.TotalMemorySegments")); + } + + private static <X> X fetchMetric(final SupplierWithException<CompletableFuture<X>, IOException> clientOperation, final Predicate<X> predicate) throws InterruptedException, ExecutionException, TimeoutException { + final CompletableFuture<X> responseFuture = FutureUtils.retrySuccessfulWithDelay(() -> { + try { + return clientOperation.get(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, + Time.seconds(1), + Deadline.fromNow(Duration.ofSeconds(5)), + predicate, + new ScheduledExecutorServiceAdapter(scheduledExecutorService)); + + return responseFuture.get(30, TimeUnit.SECONDS); + } + + private static Predicate<MetricCollectionResponseBody> getMetricNamePredicate(final String metricName) { + return response -> response.getMetrics().stream().anyMatch(metric -> metric.getId().equals(metricName)); + } +} diff --git a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/resources/log4j-test.properties b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/resources/log4j-test.properties new file mode 100644 index 00000000000..f7425cd14c0 --- /dev/null +++ b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=INFO, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%m%n diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index c0dc48595a6..a2790092b8b 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -58,6 +58,7 @@ under the License. <module>flink-state-evolution-test</module> <module>flink-e2e-test-utils</module> <module>flink-end-to-end-tests-common</module> + <module>flink-metrics-availability-test</module> <module>flink-metrics-reporter-prometheus-test</module> <module>flink-heavy-deployment-stress-test</module> </modules> diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index 915c02b644d..036796c582d 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -107,7 +107,7 @@ public void testJobManagerJMXMetricAccess() throws Exception { client.setDetached(true); client.submitJob(jobGraph, JMXJobManagerMetricTest.class.getClassLoader()); - FutureUtils.retrySuccesfulWithDelay( + FutureUtils.retrySuccessfulWithDelay( () -> client.getJobStatus(jobGraph.getJobID()), Time.milliseconds(10), deadline, diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java index 83976f16094..eb6b3ef8c50 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java @@ -1273,7 +1273,7 @@ public void close() throws Exception { // Free cluster resources clusterClient.cancel(jobId); // cancel() is non-blocking so do this to make sure the job finished - CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay( + CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccessfulWithDelay( () -> clusterClient.getJobStatus(jobId), Time.milliseconds(50), deadline, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index 73920304964..0f36a3af141 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -239,7 +239,7 @@ * @return Future which retries the given operation a given amount of times and delays the retry * in case the predicate isn't matched */ - public static <T> CompletableFuture<T> retrySuccesfulWithDelay( + public static <T> CompletableFuture<T> retrySuccessfulWithDelay( final Supplier<CompletableFuture<T>> operation, final Time retryDelay, final Deadline deadline, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsMessageParameters.java index 7fd63726e0e..311a30034c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsMessageParameters.java @@ -31,7 +31,7 @@ */ public class JobManagerMetricsMessageParameters extends MessageParameters { - private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter(); + public final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter(); @Override public Collection<MessagePathParameter<?>> getPathParameters() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java index d7e9381d185..b30e4b8b3ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java @@ -31,7 +31,7 @@ */ public class TaskManagerMetricsMessageParameters extends TaskManagerMessageParameters { - private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter(); + public final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter(); @Override public Collection<MessageQueryParameter<?>> getQueryParameters() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java index 59408a7304e..4a761215bc1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java @@ -31,7 +31,7 @@ */ public class TaskManagerMessageParameters extends MessageParameters { - private TaskManagerIdPathParameter taskManagerIdParameter = new TaskManagerIdPathParameter(); + public TaskManagerIdPathParameter taskManagerIdParameter = new TaskManagerIdPathParameter(); @Override public Collection<MessagePathParameter<?>> getPathParameters() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java index 14aecc700ad..ddae8462539 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java @@ -109,7 +109,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception { // Submit job and wait until running flink.runDetached(jobGraph); - FutureUtils.retrySuccesfulWithDelay( + FutureUtils.retrySuccessfulWithDelay( () -> flink.getJobStatus(jobGraph.getJobID()), Time.milliseconds(10), deadline, @@ -160,7 +160,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception { .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); // wait until the job is canceled - FutureUtils.retrySuccesfulWithDelay( + FutureUtils.retrySuccessfulWithDelay( () -> flink.getJobStatus(jobGraph.getJobID()), Time.milliseconds(10), deadline, diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index 0ead861adba..d4063184b62 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -153,7 +153,7 @@ public void go() throws Exception { try { NotifyingMapper.notifyLatch.await(); - FutureUtils.retrySuccesfulWithDelay( + FutureUtils.retrySuccessfulWithDelay( () -> { try { return CompletableFuture.completedFuture(client.getAccumulators(jobGraph.getJobID())); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 2cd2bbb60e9..4f607691c1b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -200,7 +200,7 @@ private void restoreJobAndVerifyState(String savepointPath, MiniClusterResourceF client.cancel(jobId); - FutureUtils.retrySuccesfulWithDelay( + FutureUtils.retrySuccessfulWithDelay( () -> client.getJobStatus(jobId), Time.milliseconds(50), Deadline.now().plus(Duration.ofSeconds(30)), diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java index 7c00de7f436..9bec331608e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java @@ -258,7 +258,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO // now the job should be able to go to RUNNING again and then eventually to FINISHED, // which it only does if it could successfully restore - CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay( + CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccessfulWithDelay( () -> clusterClient.getJobStatus(jobID), Time.milliseconds(50), deadline, diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java index ec3f1e1b9d3..e83f9ab8c58 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java @@ -393,7 +393,7 @@ public void run() { } private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws ExecutionException, InterruptedException { - FutureUtils.retrySuccesfulWithDelay( + FutureUtils.retrySuccessfulWithDelay( () -> dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())), Time.milliseconds(50L), org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())), diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index 90ea8796ca5..ed987d677d8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -294,7 +294,7 @@ static DispatcherGateway retrieveDispatcherGateway(RpcService rpcService, HighAv } private void waitUntilAllSlotsAreUsed(DispatcherGateway dispatcherGateway, Time timeout) throws ExecutionException, InterruptedException { - FutureUtils.retrySuccesfulWithDelay( + FutureUtils.retrySuccessfulWithDelay( () -> dispatcherGateway.requestClusterOverview(timeout), Time.milliseconds(50L), Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())), @@ -306,7 +306,7 @@ private void waitUntilAllSlotsAreUsed(DispatcherGateway dispatcherGateway, Time } private Collection<JobID> waitForRunningJobs(ClusterClient<?> clusterClient, Time timeout) throws ExecutionException, InterruptedException { - return FutureUtils.retrySuccesfulWithDelay( + return FutureUtils.retrySuccessfulWithDelay( CheckedSupplier.unchecked(clusterClient::listJobs), Time.milliseconds(50L), Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())), diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java index 097616feb96..0b79af513f6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -118,7 +118,7 @@ private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClien clusterClient.submitJob(jobToMigrate, classLoader); - CompletableFuture<JobStatus> jobRunningFuture = FutureUtils.retrySuccesfulWithDelay( + CompletableFuture<JobStatus> jobRunningFuture = FutureUtils.retrySuccessfulWithDelay( () -> clusterClient.getJobStatus(jobToMigrate.getJobID()), Time.milliseconds(50), deadline, @@ -152,7 +152,7 @@ private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClien assertNotNull("Could not take savepoint.", savepointPath); - CompletableFuture<JobStatus> jobCanceledFuture = FutureUtils.retrySuccesfulWithDelay( + CompletableFuture<JobStatus> jobCanceledFuture = FutureUtils.retrySuccessfulWithDelay( () -> clusterClient.getJobStatus(jobToMigrate.getJobID()), Time.milliseconds(50), deadline, @@ -173,7 +173,7 @@ private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient, clusterClient.submitJob(jobToRestore, classLoader); - CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay( + CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccessfulWithDelay( () -> clusterClient.getJobStatus(jobToRestore.getJobID()), Time.milliseconds(50), deadline, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services