zentol closed pull request #7056: [FLINK-10634][metrics][rest] Add metrics 
availability e2e test
URL: https://github.com/apache/flink/pull/7056
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/flink-metrics-availability-test/pom.xml 
b/flink-end-to-end-tests/flink-metrics-availability-test/pom.xml
new file mode 100644
index 00000000000..eea4ef8458d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-metrics-availability-test/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       <parent>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.8-SNAPSHOT</version>
+       </parent>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-metrics-availability-test</artifactId>
+       <version>1.8-SNAPSHOT</version>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-end-to-end-tests-common</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+       <profiles>
+               <profile>
+                       <id>e2e-metric-availability</id>
+                       <activation>
+                               <property>
+                                       <name>e2e-metrics</name>
+                               </property>
+                       </activation>
+                       <build>
+                               <plugins>
+                                       <plugin>
+                                               
<groupId>org.apache.maven.plugins</groupId>
+                                               
<artifactId>maven-surefire-plugin</artifactId>
+                                               <executions>
+                                                       <execution>
+                                                               
<id>e2e-metric-availability</id>
+                                                               
<phase>integration-test</phase>
+                                                               <goals>
+                                                                       
<goal>test</goal>
+                                                               </goals>
+                                                               <configuration>
+                                                                       
<includes>
+                                                                               
<include>**/*ITCase.*</include>
+                                                                       
</includes>
+                                                               </configuration>
+                                                       </execution>
+                                               </executions>
+                                       </plugin>
+                               </plugins>
+                       </build>
+               </profile>
+       </profiles>
+
+</project>
diff --git 
a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java
 
b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java
new file mode 100644
index 00000000000..28a21354e5d
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/pache/flink/metrics/tests/MetricsAvailabilityITCase.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.pache.flink.metrics.tests;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsMessageParameters;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
+import org.apache.flink.tests.util.FlinkDistribution;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * End-to-end test for the availability of metrics.
+ */
+public class MetricsAvailabilityITCase extends TestLogger {
+
+       private static final String HOST = "localhost";
+       private static final int PORT = 8081;
+
+       @Rule
+       public final FlinkDistribution dist = new FlinkDistribution();
+
+       @Nullable
+       private static ScheduledExecutorService scheduledExecutorService = null;
+
+       @BeforeClass
+       public static void startExecutor() {
+               scheduledExecutorService = Executors.newScheduledThreadPool(4);
+       }
+
+       @AfterClass
+       public static void shutdownExecutor() {
+               if (scheduledExecutorService != null) {
+                       scheduledExecutorService.shutdown();
+               }
+       }
+
+       @Test
+       public void testReporter() throws Exception {
+               dist.startFlinkCluster();
+
+               final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), 
scheduledExecutorService);
+
+               checkJobManagerMetricAvailability(restClient);
+
+               final Collection<ResourceID> taskManagerIds = 
getTaskManagerIds(restClient);
+
+               for (final ResourceID taskManagerId : taskManagerIds) {
+                       checkTaskManagerMetricAvailability(restClient, 
taskManagerId);
+               }
+       }
+
+       private static void checkJobManagerMetricAvailability(final RestClient 
restClient) throws Exception {
+               final JobManagerMetricsHeaders headers = 
JobManagerMetricsHeaders.getInstance();
+               final JobManagerMetricsMessageParameters parameters = 
headers.getUnresolvedMessageParameters();
+               
parameters.metricsFilterParameter.resolve(Collections.singletonList("numRegisteredTaskManagers"));
+
+               fetchMetric(() ->
+                               restClient.sendRequest(HOST, PORT, headers, 
parameters, EmptyRequestBody.getInstance()),
+                       getMetricNamePredicate("numRegisteredTaskManagers"));
+       }
+
+       private static Collection<ResourceID> getTaskManagerIds(final 
RestClient restClient) throws Exception {
+               final TaskManagersHeaders headers = 
TaskManagersHeaders.getInstance();
+
+               final TaskManagersInfo response = fetchMetric(() ->
+                               restClient.sendRequest(
+                                       HOST,
+                                       PORT,
+                                       headers,
+                                       EmptyMessageParameters.getInstance(),
+                                       EmptyRequestBody.getInstance()),
+                       taskManagersInfo -> 
!taskManagersInfo.getTaskManagerInfos().isEmpty());
+
+               return response.getTaskManagerInfos().stream()
+                       .map(TaskManagerInfo::getResourceId)
+                       .collect(Collectors.toList());
+       }
+
+       private static void checkTaskManagerMetricAvailability(final RestClient 
restClient, final ResourceID taskManagerId) throws Exception {
+               final TaskManagerMetricsHeaders headers = 
TaskManagerMetricsHeaders.getInstance();
+               final TaskManagerMetricsMessageParameters parameters = 
headers.getUnresolvedMessageParameters();
+               parameters.taskManagerIdParameter.resolve(taskManagerId);
+               
parameters.metricsFilterParameter.resolve(Collections.singletonList("Status.Network.TotalMemorySegments"));
+
+               fetchMetric(() ->
+                               restClient.sendRequest(HOST, PORT, headers, 
parameters, EmptyRequestBody.getInstance()),
+                       
getMetricNamePredicate("Status.Network.TotalMemorySegments"));
+       }
+
+       private static <X> X fetchMetric(final 
SupplierWithException<CompletableFuture<X>, IOException> clientOperation, final 
Predicate<X> predicate) throws InterruptedException, ExecutionException, 
TimeoutException {
+               final CompletableFuture<X> responseFuture = 
FutureUtils.retrySuccessfulWithDelay(() -> {
+                               try {
+                                       return clientOperation.get();
+                               } catch (IOException e) {
+                                       throw new RuntimeException(e);
+                               }
+                       },
+                       Time.seconds(1),
+                       Deadline.fromNow(Duration.ofSeconds(5)),
+                       predicate,
+                       new 
ScheduledExecutorServiceAdapter(scheduledExecutorService));
+
+               return responseFuture.get(30, TimeUnit.SECONDS);
+       }
+
+       private static Predicate<MetricCollectionResponseBody> 
getMetricNamePredicate(final String metricName) {
+               return response -> 
response.getMetrics().stream().anyMatch(metric -> 
metric.getId().equals(metricName));
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/resources/log4j-test.properties
 
b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/resources/log4j-test.properties
new file mode 100644
index 00000000000..f7425cd14c0
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=INFO, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%m%n
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index c0dc48595a6..a2790092b8b 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -58,6 +58,7 @@ under the License.
                <module>flink-state-evolution-test</module>
                <module>flink-e2e-test-utils</module>
                <module>flink-end-to-end-tests-common</module>
+               <module>flink-metrics-availability-test</module>
                <module>flink-metrics-reporter-prometheus-test</module>
                <module>flink-heavy-deployment-stress-test</module>
        </modules>
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 915c02b644d..036796c582d 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -107,7 +107,7 @@ public void testJobManagerJMXMetricAccess() throws 
Exception {
                        client.setDetached(true);
                        client.submitJob(jobGraph, 
JMXJobManagerMetricTest.class.getClassLoader());
 
-                       FutureUtils.retrySuccesfulWithDelay(
+                       FutureUtils.retrySuccessfulWithDelay(
                                () -> client.getJobStatus(jobGraph.getJobID()),
                                Time.milliseconds(10),
                                deadline,
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 83976f16094..eb6b3ef8c50 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -1273,7 +1273,7 @@ public void close() throws Exception {
                        // Free cluster resources
                        clusterClient.cancel(jobId);
                        // cancel() is non-blocking so do this to make sure the 
job finished
-                       CompletableFuture<JobStatus> jobStatusFuture = 
FutureUtils.retrySuccesfulWithDelay(
+                       CompletableFuture<JobStatus> jobStatusFuture = 
FutureUtils.retrySuccessfulWithDelay(
                                () -> clusterClient.getJobStatus(jobId),
                                Time.milliseconds(50),
                                deadline,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 73920304964..0f36a3af141 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -239,7 +239,7 @@
         * @return Future which retries the given operation a given amount of 
times and delays the retry
         *   in case the predicate isn't matched
         */
-       public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
+       public static <T> CompletableFuture<T> retrySuccessfulWithDelay(
                final Supplier<CompletableFuture<T>> operation,
                final Time retryDelay,
                final Deadline deadline,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsMessageParameters.java
index 7fd63726e0e..311a30034c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsMessageParameters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsMessageParameters.java
@@ -31,7 +31,7 @@
  */
 public class JobManagerMetricsMessageParameters extends MessageParameters {
 
-       private final MetricsFilterParameter metricsFilterParameter = new 
MetricsFilterParameter();
+       public final MetricsFilterParameter metricsFilterParameter = new 
MetricsFilterParameter();
 
        @Override
        public Collection<MessagePathParameter<?>> getPathParameters() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java
index d7e9381d185..b30e4b8b3ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsMessageParameters.java
@@ -31,7 +31,7 @@
  */
 public class TaskManagerMetricsMessageParameters extends 
TaskManagerMessageParameters {
 
-       private final MetricsFilterParameter metricsFilterParameter = new 
MetricsFilterParameter();
+       public final MetricsFilterParameter metricsFilterParameter = new 
MetricsFilterParameter();
 
        @Override
        public Collection<MessageQueryParameter<?>> getQueryParameters() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java
index 59408a7304e..4a761215bc1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerMessageParameters.java
@@ -31,7 +31,7 @@
  */
 public class TaskManagerMessageParameters extends MessageParameters {
 
-       private TaskManagerIdPathParameter taskManagerIdParameter = new 
TaskManagerIdPathParameter();
+       public TaskManagerIdPathParameter taskManagerIdParameter = new 
TaskManagerIdPathParameter();
 
        @Override
        public Collection<MessagePathParameter<?>> getPathParameters() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index 14aecc700ad..ddae8462539 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -109,7 +109,7 @@ public void testCancelAsyncProducerAndConsumer() throws 
Exception {
                // Submit job and wait until running
                flink.runDetached(jobGraph);
 
-               FutureUtils.retrySuccesfulWithDelay(
+               FutureUtils.retrySuccessfulWithDelay(
                        () -> flink.getJobStatus(jobGraph.getJobID()),
                        Time.milliseconds(10),
                        deadline,
@@ -160,7 +160,7 @@ public void testCancelAsyncProducerAndConsumer() throws 
Exception {
                        .get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
 
                // wait until the job is canceled
-               FutureUtils.retrySuccesfulWithDelay(
+               FutureUtils.retrySuccessfulWithDelay(
                        () -> flink.getJobStatus(jobGraph.getJobID()),
                        Time.milliseconds(10),
                        deadline,
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 0ead861adba..d4063184b62 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -153,7 +153,7 @@ public void go() throws Exception {
                try {
                        NotifyingMapper.notifyLatch.await();
 
-                       FutureUtils.retrySuccesfulWithDelay(
+                       FutureUtils.retrySuccessfulWithDelay(
                                () -> {
                                        try {
                                                return 
CompletableFuture.completedFuture(client.getAccumulators(jobGraph.getJobID()));
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 2cd2bbb60e9..4f607691c1b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -200,7 +200,7 @@ private void restoreJobAndVerifyState(String savepointPath, 
MiniClusterResourceF
 
                        client.cancel(jobId);
 
-                       FutureUtils.retrySuccesfulWithDelay(
+                       FutureUtils.retrySuccessfulWithDelay(
                                () -> client.getJobStatus(jobId),
                                Time.milliseconds(50),
                                Deadline.now().plus(Duration.ofSeconds(30)),
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index 7c00de7f436..9bec331608e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -258,7 +258,7 @@ public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) throws IO
 
                // now the job should be able to go to RUNNING again and then 
eventually to FINISHED,
                // which it only does if it could successfully restore
-               CompletableFuture<JobStatus> jobStatusFuture = 
FutureUtils.retrySuccesfulWithDelay(
+               CompletableFuture<JobStatus> jobStatusFuture = 
FutureUtils.retrySuccessfulWithDelay(
                        () -> clusterClient.getJobStatus(jobID),
                        Time.milliseconds(50),
                        deadline,
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
index ec3f1e1b9d3..e83f9ab8c58 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
@@ -393,7 +393,7 @@ public void run() {
        }
 
        private void waitForTaskManagers(int numberOfTaskManagers, 
DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws 
ExecutionException, InterruptedException {
-               FutureUtils.retrySuccesfulWithDelay(
+               FutureUtils.retrySuccessfulWithDelay(
                        () -> 
dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())),
                        Time.milliseconds(50L),
                        
org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())),
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 90ea8796ca5..ed987d677d8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -294,7 +294,7 @@ static DispatcherGateway 
retrieveDispatcherGateway(RpcService rpcService, HighAv
        }
 
        private void waitUntilAllSlotsAreUsed(DispatcherGateway 
dispatcherGateway, Time timeout) throws ExecutionException, 
InterruptedException {
-               FutureUtils.retrySuccesfulWithDelay(
+               FutureUtils.retrySuccessfulWithDelay(
                        () -> dispatcherGateway.requestClusterOverview(timeout),
                        Time.milliseconds(50L),
                        
Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
@@ -306,7 +306,7 @@ private void waitUntilAllSlotsAreUsed(DispatcherGateway 
dispatcherGateway, Time
        }
 
        private Collection<JobID> waitForRunningJobs(ClusterClient<?> 
clusterClient, Time timeout) throws ExecutionException, InterruptedException {
-               return FutureUtils.retrySuccesfulWithDelay(
+               return FutureUtils.retrySuccessfulWithDelay(
                                
CheckedSupplier.unchecked(clusterClient::listJobs),
                                Time.milliseconds(50L),
                                
Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 097616feb96..0b79af513f6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -118,7 +118,7 @@ private String migrateJob(ClassLoader classLoader, 
ClusterClient<?> clusterClien
 
                clusterClient.submitJob(jobToMigrate, classLoader);
 
-               CompletableFuture<JobStatus> jobRunningFuture = 
FutureUtils.retrySuccesfulWithDelay(
+               CompletableFuture<JobStatus> jobRunningFuture = 
FutureUtils.retrySuccessfulWithDelay(
                        () -> 
clusterClient.getJobStatus(jobToMigrate.getJobID()),
                        Time.milliseconds(50),
                        deadline,
@@ -152,7 +152,7 @@ private String migrateJob(ClassLoader classLoader, 
ClusterClient<?> clusterClien
 
                assertNotNull("Could not take savepoint.", savepointPath);
 
-               CompletableFuture<JobStatus> jobCanceledFuture = 
FutureUtils.retrySuccesfulWithDelay(
+               CompletableFuture<JobStatus> jobCanceledFuture = 
FutureUtils.retrySuccessfulWithDelay(
                        () -> 
clusterClient.getJobStatus(jobToMigrate.getJobID()),
                        Time.milliseconds(50),
                        deadline,
@@ -173,7 +173,7 @@ private void restoreJob(ClassLoader classLoader, 
ClusterClient<?> clusterClient,
 
                clusterClient.submitJob(jobToRestore, classLoader);
 
-               CompletableFuture<JobStatus> jobStatusFuture = 
FutureUtils.retrySuccesfulWithDelay(
+               CompletableFuture<JobStatus> jobStatusFuture = 
FutureUtils.retrySuccessfulWithDelay(
                        () -> 
clusterClient.getJobStatus(jobToRestore.getJobID()),
                        Time.milliseconds(50),
                        deadline,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to