[ 
https://issues.apache.org/jira/browse/FLINK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656373#comment-16656373
 ] 

ASF GitHub Bot commented on FLINK-10309:
----------------------------------------

asfgit closed pull request #6785: [FLINK-10309][rest] Before shutting down 
cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 9eaef34a33a..b2d6d150561 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -445,9 +445,7 @@ private Configuration 
generateClusterConfiguration(Configuration configuration)
        private CompletableFuture<Void> closeClusterComponent(ApplicationStatus 
applicationStatus, @Nullable String diagnostics) {
                synchronized (lock) {
                        if (clusterComponent != null) {
-                               final CompletableFuture<Void> 
deregisterApplicationFuture = 
clusterComponent.deregisterApplication(applicationStatus, diagnostics);
-
-                               return 
FutureUtils.runAfterwards(deregisterApplicationFuture, 
clusterComponent::closeAsync);
+                               return 
clusterComponent.deregisterApplicationAndClose(applicationStatus, diagnostics);
                        } else {
                                return CompletableFuture.completedFuture(null);
                        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
index b07095c2b6d..6e28ab6c6b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
@@ -26,7 +26,6 @@
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
-import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 
 import javax.annotation.Nonnull;
@@ -41,7 +40,7 @@
  * Component which starts a {@link Dispatcher}, {@link ResourceManager} and 
{@link WebMonitorEndpoint}
  * in the same process.
  */
-public class DispatcherResourceManagerComponent<T extends Dispatcher> 
implements AutoCloseableAsync {
+public class DispatcherResourceManagerComponent<T extends Dispatcher> {
 
        @Nonnull
        private final T dispatcher;
@@ -126,65 +125,77 @@ public T getDispatcher() {
                return webMonitorEndpoint;
        }
 
-       @Override
-       public CompletableFuture<Void> closeAsync() {
+       /**
+        * Deregister the Flink application from the resource management system 
by signalling
+        * the {@link ResourceManager}.
+        *
+        * @param applicationStatus to terminate the application with
+        * @param diagnostics additional information about the shut down, can 
be {@code null}
+        * @return Future which is completed once the shut down
+        */
+       public CompletableFuture<Void> deregisterApplicationAndClose(
+                       final ApplicationStatus applicationStatus,
+                       final @Nullable String diagnostics) {
+
                if (isRunning.compareAndSet(true, false)) {
-                       Exception exception = null;
+                       final CompletableFuture<Void> 
closeWebMonitorAndDeregisterAppFuture =
+                               
FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () -> 
deregisterApplication(applicationStatus, diagnostics));
 
-                       final Collection<CompletableFuture<Void>> 
terminationFutures = new ArrayList<>(4);
+                       return 
FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, 
this::closeAsyncInternal);
+               } else {
+                       return terminationFuture;
+               }
+       }
 
-                       try {
-                               dispatcherLeaderRetrievalService.stop();
-                       } catch (Exception e) {
-                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-                       }
+       private CompletableFuture<Void> deregisterApplication(
+                       final ApplicationStatus applicationStatus,
+                       final @Nullable String diagnostics) {
 
-                       try {
-                               resourceManagerRetrievalService.stop();
-                       } catch (Exception e) {
-                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-                       }
+               final ResourceManagerGateway selfGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
+               return selfGateway.deregisterApplication(applicationStatus, 
diagnostics).thenApply(ack -> null);
+       }
 
-                       terminationFutures.add(webMonitorEndpoint.closeAsync());
+       private CompletableFuture<Void> closeAsyncInternal() {
+               Exception exception = null;
 
-                       dispatcher.shutDown();
-                       
terminationFutures.add(dispatcher.getTerminationFuture());
+               final Collection<CompletableFuture<Void>> terminationFutures = 
new ArrayList<>(3);
 
-                       resourceManager.shutDown();
-                       
terminationFutures.add(resourceManager.getTerminationFuture());
+               try {
+                       dispatcherLeaderRetrievalService.stop();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
 
-                       if (exception != null) {
-                               
terminationFutures.add(FutureUtils.completedExceptionally(exception));
-                       }
+               try {
+                       resourceManagerRetrievalService.stop();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
 
-                       final CompletableFuture<Void> 
componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
+               dispatcher.shutDown();
+               terminationFutures.add(dispatcher.getTerminationFuture());
 
-                       final CompletableFuture<Void> 
metricGroupTerminationFuture = FutureUtils.runAfterwards(
-                               componentTerminationFuture,
-                               jobManagerMetricGroup::close);
+               resourceManager.shutDown();
+               terminationFutures.add(resourceManager.getTerminationFuture());
 
-                       metricGroupTerminationFuture.whenComplete((aVoid, 
throwable) -> {
-                               if (throwable != null) {
-                                       
terminationFuture.completeExceptionally(throwable);
-                               } else {
-                                       terminationFuture.complete(aVoid);
-                               }
-                       });
+               if (exception != null) {
+                       
terminationFutures.add(FutureUtils.completedExceptionally(exception));
                }
 
-               return terminationFuture;
-       }
+               final CompletableFuture<Void> componentTerminationFuture = 
FutureUtils.completeAll(terminationFutures);
 
-       /**
-        * Deregister the Flink application from the resource management system 
by signalling
-        * the {@link ResourceManager}.
-        *
-        * @param applicationStatus to terminate the application with
-        * @param diagnostics additional information about the shut down, can 
be {@code null}
-        * @return Future which is completed once the shut down
-        */
-       public CompletableFuture<Void> deregisterApplication(ApplicationStatus 
applicationStatus, @Nullable String diagnostics) {
-               final ResourceManagerGateway selfGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
-               return selfGateway.deregisterApplication(applicationStatus, 
diagnostics).thenApply(ack -> null);
+               final CompletableFuture<Void> metricGroupTerminationFuture = 
FutureUtils.runAfterwards(
+                       componentTerminationFuture,
+                       jobManagerMetricGroup::close);
+
+               metricGroupTerminationFuture.whenComplete((aVoid, throwable) -> 
{
+                       if (throwable != null) {
+                               
terminationFuture.completeExceptionally(throwable);
+                       } else {
+                               terminationFuture.complete(aVoid);
+                       }
+               });
+
+               return terminationFuture;
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 31c6a97124b..5d2d363cf71 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -679,7 +679,7 @@ public void acknowledgeCheckpoint(
                                try {
                                        
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
                                } catch (Throwable t) {
-                                       log.warn("Error while processing 
checkpoint acknowledgement message");
+                                       log.warn("Error while processing 
checkpoint acknowledgement message", t);
                                }
                        });
                } else {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 43636dd8f9e..068192cf794 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -63,6 +63,7 @@
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * An abstract class for netty-based REST server endpoints.
@@ -85,6 +86,7 @@
 
        private final CompletableFuture<Void> terminationFuture;
 
+       private List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> 
handlers;
        private ServerBootstrap bootstrap;
        private Channel serverChannel;
        private String restBaseUrl;
@@ -131,7 +133,7 @@ public final void start() throws Exception {
                        final Router router = new Router();
                        final CompletableFuture<String> restAddressFuture = new 
CompletableFuture<>();
 
-                       List<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers = initializeHandlers(restAddressFuture);
+                       handlers = initializeHandlers(restAddressFuture);
 
                        /* sort the handlers such that they are ordered the 
following:
                         * /jobs
@@ -265,10 +267,13 @@ public String getRestBaseUrl() {
                        log.info("Shutting down rest endpoint.");
 
                        if (state == State.RUNNING) {
-                               final CompletableFuture<Void> shutDownFuture = 
shutDownInternal();
+                               final CompletableFuture<Void> shutDownFuture = 
FutureUtils.composeAfterwards(
+                                       closeHandlersAsync(),
+                                       this::shutDownInternal);
 
                                shutDownFuture.whenComplete(
                                        (Void ignored, Throwable throwable) -> {
+                                               log.info("Shut down complete.");
                                                if (throwable != null) {
                                                        
terminationFuture.completeExceptionally(throwable);
                                                } else {
@@ -285,6 +290,14 @@ public String getRestBaseUrl() {
                }
        }
 
+       private FutureUtils.ConjunctFuture<Void> closeHandlersAsync() {
+               return FutureUtils.waitForAll(handlers.stream()
+                       .map(tuple -> tuple.f1)
+                       .filter(handler -> handler instanceof 
AutoCloseableAsync)
+                       .map(handler -> ((AutoCloseableAsync) 
handler).closeAsync())
+                       .collect(Collectors.toList()));
+       }
+
        /**
         * Stops this REST server endpoint.
         *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
similarity index 87%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 3d1ec9d0066..5a1c371d5a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -16,14 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.rest;
+package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.rest.handler.FileUploads;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.HandlerRequestException;
-import org.apache.flink.runtime.rest.handler.RedirectHandler;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.FileUploadHandler;
 import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
@@ -33,6 +30,7 @@
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
@@ -64,7 +62,7 @@
  * @param <R> type of the incoming request
  * @param <M> type of the message parameters
  */
-public abstract class AbstractHandler<T extends RestfulGateway, R extends 
RequestBody, M extends MessageParameters> extends RedirectHandler<T> {
+public abstract class AbstractHandler<T extends RestfulGateway, R extends 
RequestBody, M extends MessageParameters> extends RedirectHandler<T> implements 
AutoCloseableAsync {
 
        protected final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -72,6 +70,11 @@
 
        private final UntypedResponseMessageHeaders<R, M> 
untypedResponseMessageHeaders;
 
+       /**
+        * Used to ensure that the handler is not closed while there are still 
in-flight requests.
+        */
+       private final InFlightRequestTracker inFlightRequestTracker;
+
        protected AbstractHandler(
                        @Nonnull CompletableFuture<String> localAddressFuture,
                        @Nonnull GatewayRetriever<? extends T> leaderRetriever,
@@ -81,6 +84,7 @@ protected AbstractHandler(
                super(localAddressFuture, leaderRetriever, timeout, 
responseHeaders);
 
                this.untypedResponseMessageHeaders = 
Preconditions.checkNotNull(untypedResponseMessageHeaders);
+               this.inFlightRequestTracker = new InFlightRequestTracker();
        }
 
        @Override
@@ -92,6 +96,7 @@ protected void respondAsLeader(ChannelHandlerContext ctx, 
RoutedRequest routedRe
 
                FileUploads uploadedFiles = null;
                try {
+                       inFlightRequestTracker.registerRequest();
                        if (!(httpRequest instanceof FullHttpRequest)) {
                                // The RestServerEndpoint defines a 
HttpObjectAggregator in the pipeline that always returns
                                // FullHttpRequests.
@@ -154,8 +159,12 @@ protected void respondAsLeader(ChannelHandlerContext ctx, 
RoutedRequest routedRe
 
                        final FileUploads finalUploadedFiles = uploadedFiles;
                        requestProcessingFuture
-                               .whenComplete((Void ignored, Throwable 
throwable) -> cleanupFileUploads(finalUploadedFiles));
+                               .whenComplete((Void ignored, Throwable 
throwable) -> {
+                                       
inFlightRequestTracker.deregisterRequest();
+                                       cleanupFileUploads(finalUploadedFiles);
+                               });
                } catch (RestHandlerException rhe) {
+                       inFlightRequestTracker.deregisterRequest();
                        HandlerUtils.sendErrorResponse(
                                ctx,
                                httpRequest,
@@ -164,6 +173,7 @@ protected void respondAsLeader(ChannelHandlerContext ctx, 
RoutedRequest routedRe
                                responseHeaders);
                        cleanupFileUploads(uploadedFiles);
                } catch (Throwable e) {
+                       inFlightRequestTracker.deregisterRequest();
                        log.error("Request processing failed.", e);
                        HandlerUtils.sendErrorResponse(
                                ctx,
@@ -175,6 +185,15 @@ protected void respondAsLeader(ChannelHandlerContext ctx, 
RoutedRequest routedRe
                }
        }
 
+       @Override
+       public final CompletableFuture<Void> closeAsync() {
+               return FutureUtils.composeAfterwards(closeHandlerAsync(), 
inFlightRequestTracker::awaitAsync);
+       }
+
+       protected CompletableFuture<Void> closeHandlerAsync() {
+               return CompletableFuture.completedFuture(null);
+       }
+
        private void cleanupFileUploads(@Nullable FileUploads uploadedFiles) {
                if (uploadedFiles != null) {
                        try {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 9cfb58e98a8..0397cb875f2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -21,7 +21,6 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.rest.AbstractHandler;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -81,11 +80,9 @@ protected AbstractRestHandler(
                        response = FutureUtils.completedExceptionally(e);
                }
 
-               return response.whenComplete((P resp, Throwable throwable) -> {
-                       Tuple2<ResponseBody, HttpResponseStatus> r = throwable 
!= null ?
-                               errorResponse(throwable) : Tuple2.of(resp, 
messageHeaders.getResponseStatusCode());
-                       HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1, 
responseHeaders);
-               }).thenApply(ignored -> null);
+               return response.handle((resp, throwable) -> throwable != null ?
+                       errorResponse(throwable) : Tuple2.of(resp, 
messageHeaders.getResponseStatusCode()))
+                       .thenCompose(r -> HandlerUtils.sendResponse(ctx, 
httpRequest, r.f0, r.f1, responseHeaders));
        }
 
        private Tuple2<ResponseBody, HttpResponseStatus> 
errorResponse(Throwable throwable) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java
new file mode 100644
index 00000000000..92478b1886f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Phaser;
+
+/**
+ * Tracks in-flight client requests.
+ *
+ * @see AbstractHandler
+ */
+@ThreadSafe
+class InFlightRequestTracker {
+
+       private final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+
+       private final Phaser phaser = new Phaser(1) {
+               @Override
+               protected boolean onAdvance(final int phase, final int 
registeredParties) {
+                       terminationFuture.complete(null);
+                       return true;
+               }
+       };
+
+       /**
+        * Registers an in-flight request.
+        */
+       public void registerRequest() {
+               phaser.register();
+       }
+
+       /**
+        * Deregisters an in-flight request.
+        */
+       public void deregisterRequest() {
+               phaser.arriveAndDeregister();
+       }
+
+       /**
+        * Returns a future that completes when the in-flight requests that 
were registered prior to
+        * calling this method are deregistered.
+        */
+       public CompletableFuture<Void> awaitAsync() {
+               phaser.arriveAndDeregister();
+               return terminationFuture;
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
index e0c3fbec152..32b04921e42 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rest.handler.async;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rest.NotFoundException;
@@ -29,24 +28,14 @@
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.runtime.rest.messages.TriggerId;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.types.Either;
-import org.apache.flink.util.FlinkException;
-
-import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 
 /**
  * HTTP handlers for asynchronous operations.
@@ -176,7 +165,7 @@ protected StatusHandler(
                        final Either<Throwable, R> operationResultOrError;
                        try {
                                operationResultOrError = 
completedOperationCache.get(key);
-                       } catch (UnknownOperationKey e) {
+                       } catch (UnknownOperationKeyException e) {
                                return FutureUtils.completedExceptionally(
                                        new NotFoundException("Operation not 
found under key: " + key, e));
                        }
@@ -194,6 +183,11 @@ protected StatusHandler(
                        }
                }
 
+               @Override
+               public CompletableFuture<Void> closeHandlerAsync() {
+                       return completedOperationCache.closeAsync();
+               }
+
                /**
                 * Extract the operation key under which the operation result 
future is stored.
                 *
@@ -220,79 +214,4 @@ protected StatusHandler(
                protected abstract V operationResultResponse(R operationResult);
        }
 
-       /**
-        * Cache to manage ongoing operations.
-        *
-        * <p>The cache allows to register ongoing operations by calling
-        * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
-        * {@code CompletableFuture} contains the operation result. Completed 
operations will be
-        * removed from the cache automatically after a fixed timeout.
-        */
-       @ThreadSafe
-       protected static class CompletedOperationCache<K, R> {
-
-               private static final long 
COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;
-
-               /**
-                * Stores SavepointKeys of ongoing savepoint.
-                * If the savepoint completes, it will be moved to {@link 
#completedOperations}.
-                */
-               private final Set<K> registeredOperationTriggers = 
ConcurrentHashMap.newKeySet();
-
-               /** Caches the location of completed operations. */
-               private final Cache<K, Either<Throwable, R>> 
completedOperations =
-                       CacheBuilder.newBuilder()
-                               
.expireAfterWrite(COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
-                               .build();
-
-               /**
-                * Registers an ongoing operation with the cache.
-                *
-                * @param operationResultFuture A future containing the 
operation result.
-                */
-               public void registerOngoingOperation(
-                       final K operationKey,
-                       final CompletableFuture<R> operationResultFuture) {
-                       registeredOperationTriggers.add(operationKey);
-                       operationResultFuture.whenComplete((savepointLocation, 
error) -> {
-                               if (error == null) {
-                                       completedOperations.put(operationKey, 
Either.Right(savepointLocation));
-                               } else {
-                                       completedOperations.put(operationKey, 
Either.Left(error));
-                               }
-                               
registeredOperationTriggers.remove(operationKey);
-                       });
-               }
-
-               /**
-                * Returns the operation result or a {@code Throwable} if the 
{@code CompletableFuture}
-                * finished, otherwise {@code null}.
-                *
-                * @throws UnknownOperationKey If the operation is not found, 
and there is no ongoing
-                *                                   operation under the 
provided key.
-                */
-               @Nullable
-               public Either<Throwable, R> get(
-                       final K operationKey) throws UnknownOperationKey {
-                       Either<Throwable, R> operationResultOrError = null;
-                       if (!registeredOperationTriggers.contains(operationKey)
-                               && (operationResultOrError = 
completedOperations.getIfPresent(operationKey)) == null) {
-                               throw new UnknownOperationKey(operationKey);
-                       }
-                       return operationResultOrError;
-               }
-       }
-
-       /**
-        * Exception that indicates that there is no ongoing or completed 
savepoint for a given
-        * {@link JobID} and {@link TriggerId} pair.
-        */
-       static class UnknownOperationKey extends FlinkException {
-               private static final long serialVersionUID = 1L;
-
-               UnknownOperationKey(final Object operationKey) {
-                       super("No ongoing operation for " + operationKey);
-               }
-       }
-
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
new file mode 100644
index 00000000000..95bb2239fb2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Cache to manage ongoing operations.
+ *
+ * <p>The cache allows to register ongoing operations by calling
+ * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
+ * {@code CompletableFuture} contains the operation result. Completed 
operations will be
+ * removed from the cache automatically after a fixed timeout.
+ */
+@ThreadSafe
+class CompletedOperationCache<K extends OperationKey, R> implements 
AutoCloseableAsync {
+
+       private static final long 
COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;
+
+       private static final Logger LOGGER = 
LoggerFactory.getLogger(CompletedOperationCache.class);
+
+       /**
+        * In-progress asynchronous operations.
+        */
+       private final Map<K, ResultAccessTracker<R>> 
registeredOperationTriggers = new ConcurrentHashMap<>();
+
+       /**
+        * Caches the result of completed operations.
+        */
+       private final Cache<K, ResultAccessTracker<R>> completedOperations;
+
+       CompletedOperationCache() {
+               this(Ticker.systemTicker());
+       }
+
+       @VisibleForTesting
+       CompletedOperationCache(final Ticker ticker) {
+               completedOperations = CacheBuilder.newBuilder()
+                       
.expireAfterWrite(COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+                       .removalListener((RemovalListener<K, 
ResultAccessTracker<R>>) removalNotification -> {
+                               if (removalNotification.wasEvicted()) {
+                                       
Preconditions.checkState(removalNotification.getKey() != null);
+                                       
Preconditions.checkState(removalNotification.getValue() != null);
+
+                                       // When shutting down the cache, we 
wait until all results are accessed.
+                                       // When a result gets evicted from the 
cache, it will not be possible to access
+                                       // it any longer, and we might be in 
the process of shutting down, so we mark
+                                       // the result as accessed to avoid 
waiting indefinitely.
+                                       
removalNotification.getValue().markAccessed();
+
+                                       LOGGER.info("Evicted result with 
trigger id {} because its TTL of {}s has expired.",
+                                               
removalNotification.getKey().getTriggerId(),
+                                               
COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS);
+                               }
+                       })
+                       .ticker(ticker)
+                       .build();
+       }
+
+       /**
+        * Registers an ongoing operation with the cache.
+        *
+        * @param operationResultFuture A future containing the operation 
result.
+        */
+       public void registerOngoingOperation(
+                       final K operationKey,
+                       final CompletableFuture<R> operationResultFuture) {
+               final ResultAccessTracker<R> inProgress = 
ResultAccessTracker.inProgress();
+               registeredOperationTriggers.put(operationKey, inProgress);
+               operationResultFuture.whenComplete((result, error) -> {
+                       if (error == null) {
+                               completedOperations.put(operationKey, 
inProgress.finishOperation(Either.Right(result)));
+                       } else {
+                               completedOperations.put(operationKey, 
inProgress.finishOperation(Either.Left(error)));
+                       }
+                       registeredOperationTriggers.remove(operationKey);
+               });
+       }
+
+       /**
+        * Returns the operation result or a {@code Throwable} if the {@code 
CompletableFuture}
+        * finished, otherwise {@code null}.
+        *
+        * @throws UnknownOperationKeyException If the operation is not found, 
and there is no ongoing
+        *                                      operation under the provided 
key.
+        */
+       @Nullable
+       public Either<Throwable, R> get(
+                       final K operationKey) throws 
UnknownOperationKeyException {
+               ResultAccessTracker<R> resultAccessTracker;
+               if ((resultAccessTracker = 
registeredOperationTriggers.get(operationKey)) == null
+                       && (resultAccessTracker = 
completedOperations.getIfPresent(operationKey)) == null) {
+                       throw new UnknownOperationKeyException(operationKey);
+               }
+
+               return resultAccessTracker.accessOperationResultOrError();
+       }
+
+       @Override
+       public CompletableFuture<Void> closeAsync() {
+               return FutureUtils.orTimeout(
+                       asyncWaitForResultsToBeAccessed(),
+                       COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS,
+                       TimeUnit.SECONDS);
+       }
+
+       private CompletableFuture<Void> asyncWaitForResultsToBeAccessed() {
+               return FutureUtils.waitForAll(
+                       
Stream.concat(registeredOperationTriggers.values().stream(), 
completedOperations.asMap().values().stream())
+                               .map(ResultAccessTracker::getAccessedFuture)
+                               .collect(Collectors.toList()));
+       }
+
+       @VisibleForTesting
+       void cleanUp() {
+               completedOperations.cleanUp();
+       }
+
+       /**
+        * Stores the result of an asynchronous operation, and tracks accesses 
to it.
+        */
+       private static class ResultAccessTracker<R> {
+
+               /** Result of an asynchronous operation. Null if operation is 
in progress. */
+               @Nullable
+               private final Either<Throwable, R> operationResultOrError;
+
+               /** Future that completes if a non-null {@link 
#operationResultOrError} is accessed. */
+               private final CompletableFuture<Void> accessed;
+
+               private static <R> ResultAccessTracker<R> inProgress() {
+                       return new ResultAccessTracker<>();
+               }
+
+               private ResultAccessTracker() {
+                       this.operationResultOrError = null;
+                       this.accessed = new CompletableFuture<>();
+               }
+
+               private ResultAccessTracker(final Either<Throwable, R> 
operationResultOrError, final CompletableFuture<Void> accessed) {
+                       this.operationResultOrError = 
checkNotNull(operationResultOrError);
+                       this.accessed = checkNotNull(accessed);
+               }
+
+               /**
+                * Creates a new instance of the tracker with the result of the 
asynchronous operation set.
+                */
+               public ResultAccessTracker<R> finishOperation(final 
Either<Throwable, R> operationResultOrError) {
+                       checkState(this.operationResultOrError == null);
+
+                       return new 
ResultAccessTracker<>(checkNotNull(operationResultOrError), this.accessed);
+               }
+
+               /**
+                * If present, returns the result of the asynchronous 
operation, and marks the result as
+                * accessed. If the result is not present, this method returns 
null.
+                */
+               @Nullable
+               public Either<Throwable, R> accessOperationResultOrError() {
+                       if (operationResultOrError != null) {
+                               markAccessed();
+                       }
+                       return operationResultOrError;
+               }
+
+               public CompletableFuture<Void> getAccessedFuture() {
+                       return accessed;
+               }
+
+               private void markAccessed() {
+                       accessed.complete(null);
+               }
+
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/UnknownOperationKeyException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/UnknownOperationKeyException.java
new file mode 100644
index 00000000000..962b80fe5e6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/UnknownOperationKeyException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception that indicates that there is no ongoing or completed savepoint 
for a given
+ * {@link JobID} and {@link TriggerId} pair.
+ */
+class UnknownOperationKeyException extends FlinkException {
+       private static final long serialVersionUID = 1L;
+
+       UnknownOperationKeyException(final Object operationKey) {
+               super("No ongoing operation for " + operationKey);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
index 4bb473e9ceb..e773702274f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
@@ -33,7 +33,7 @@
  * A pair of {@link JobID} and {@link TriggerId} used as a key to a hash based
  * collection.
  *
- * @see AbstractAsynchronousOperationHandlers.CompletedOperationCache
+ * @see AbstractAsynchronousOperationHandlers
  */
 @Immutable
 public class AsynchronousJobOperationKey extends OperationKey {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 4c7ac9406b1..8a20868ce37 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -24,7 +24,7 @@
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
-import org.apache.flink.runtime.rest.AbstractHandler;
+import org.apache.flink.runtime.rest.handler.AbstractHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
index b407ada46e6..b60afbb5948 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
@@ -45,6 +45,7 @@
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
 import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
@@ -69,7 +70,7 @@
         * @param headers additional header values
         * @param <P> type of the response
         */
-       public static <P extends ResponseBody> void sendResponse(
+       public static <P extends ResponseBody> CompletableFuture<Void> 
sendResponse(
                        ChannelHandlerContext channelHandlerContext,
                        HttpRequest httpRequest,
                        P response,
@@ -80,15 +81,14 @@
                        mapper.writeValue(sw, response);
                } catch (IOException ioe) {
                        LOG.error("Internal server error. Could not map 
response to JSON.", ioe);
-                       sendErrorResponse(
+                       return sendErrorResponse(
                                channelHandlerContext,
                                httpRequest,
                                new ErrorResponseBody("Internal server error. 
Could not map response to JSON."),
                                HttpResponseStatus.INTERNAL_SERVER_ERROR,
                                headers);
-                       return;
                }
-               sendResponse(
+               return sendResponse(
                        channelHandlerContext,
                        httpRequest,
                        sw.toString(),
@@ -105,14 +105,14 @@
         * @param statusCode of the message to send
         * @param headers additional header values
         */
-       public static void sendErrorResponse(
+       public static CompletableFuture<Void> sendErrorResponse(
                        ChannelHandlerContext channelHandlerContext,
                        HttpRequest httpRequest,
                        ErrorResponseBody errorMessage,
                        HttpResponseStatus statusCode,
                        Map<String, String> headers) {
 
-               sendErrorResponse(
+               return sendErrorResponse(
                        channelHandlerContext,
                        HttpHeaders.isKeepAlive(httpRequest),
                        errorMessage,
@@ -129,7 +129,7 @@ public static void sendErrorResponse(
         * @param statusCode of the message to send
         * @param headers additional header values
         */
-       public static void sendErrorResponse(
+       public static CompletableFuture<Void> sendErrorResponse(
                        ChannelHandlerContext channelHandlerContext,
                        boolean keepAlive,
                        ErrorResponseBody errorMessage,
@@ -142,14 +142,14 @@ public static void sendErrorResponse(
                } catch (IOException e) {
                        // this should never happen
                        LOG.error("Internal server error. Could not map error 
response to JSON.", e);
-                       sendResponse(
+                       return sendResponse(
                                channelHandlerContext,
                                keepAlive,
                                "Internal server error. Could not map error 
response to JSON.",
                                HttpResponseStatus.INTERNAL_SERVER_ERROR,
                                headers);
                }
-               sendResponse(
+               return sendResponse(
                        channelHandlerContext,
                        keepAlive,
                        sw.toString(),
@@ -166,14 +166,14 @@ public static void sendErrorResponse(
         * @param statusCode of the message to send
         * @param headers additional header values
         */
-       public static void sendResponse(
+       public static CompletableFuture<Void> sendResponse(
                        @Nonnull ChannelHandlerContext channelHandlerContext,
                        @Nonnull HttpRequest httpRequest,
                        @Nonnull String message,
                        @Nonnull HttpResponseStatus statusCode,
                        @Nonnull Map<String, String> headers) {
 
-               sendResponse(
+               return sendResponse(
                        channelHandlerContext,
                        HttpHeaders.isKeepAlive(httpRequest),
                        message,
@@ -190,7 +190,7 @@ public static void sendResponse(
         * @param statusCode of the message to send
         * @param headers additional header values
         */
-       public static void sendResponse(
+       public static CompletableFuture<Void> sendResponse(
                        @Nonnull ChannelHandlerContext channelHandlerContext,
                        boolean keepAlive,
                        @Nonnull String message,
@@ -223,5 +223,19 @@ public static void sendResponse(
                if (!keepAlive) {
                        
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
                }
+
+               return toCompletableFuture(lastContentFuture);
+       }
+
+       private static CompletableFuture<Void> toCompletableFuture(final 
ChannelFuture channelFuture) {
+               final CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
+               channelFuture.addListener(future -> {
+                       if (future.isSuccess()) {
+                               completableFuture.complete(null);
+                       } else {
+                               
completableFuture.completeExceptionally(future.cause());
+                       }
+               });
+               return completableFuture;
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
index c289634bc07..cc116dd7714 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
@@ -27,6 +27,7 @@
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ManualTicker;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -281,20 +282,6 @@ private FileArchivedExecutionGraphStore 
createDefaultExecutionGraphStore(File st
                        Ticker.systemTicker());
        }
 
-       private static final class ManualTicker extends Ticker {
-
-               private long currentTime = 0;
-
-               @Override
-               public long read() {
-                       return currentTime;
-               }
-
-               void advanceTime(long duration, TimeUnit timeUnit) {
-                       currentTime += timeUnit.toNanos(duration);
-               }
-       }
-
        private static final class PartialArchivedExecutionGraphMatcher extends 
BaseMatcher<ArchivedExecutionGraph> {
 
                private final ArchivedExecutionGraph archivedExecutionGraph;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 430bfad38fd..0c28745518e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -91,14 +91,18 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -131,6 +135,8 @@
        private SSLContext defaultSSLContext;
        private SSLSocketFactory defaultSSLSocketFactory;
 
+       private TestHandler testHandler;
+
        public RestServerEndpointITCase(final Configuration config) {
                this.config = requireNonNull(config);
        }
@@ -194,7 +200,7 @@ public void setup() throws Exception {
                final GatewayRetriever<RestfulGateway> mockGatewayRetriever = 
() ->
                        CompletableFuture.completedFuture(mockRestfulGateway);
 
-               TestHandler testHandler = new TestHandler(
+               testHandler = new TestHandler(
                        CompletableFuture.completedFuture(restAddress),
                        mockGatewayRetriever,
                        RpcUtils.INF_TIMEOUT);
@@ -253,7 +259,7 @@ public void teardown() throws Exception {
                }
 
                if (serverEndpoint != null) {
-                       serverEndpoint.close();
+                       serverEndpoint.closeAsync().get(timeout.getSize(), 
timeout.getUnit());
                        serverEndpoint = null;
                }
        }
@@ -264,37 +270,25 @@ public void teardown() throws Exception {
         */
        @Test
        public void testRequestInterleaving() throws Exception {
-
-               TestParameters parameters = new TestParameters();
-               parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
-               
parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
+               final HandlerBlocker handlerBlocker = new 
HandlerBlocker(timeout);
+               testHandler.handlerBody = id -> {
+                       if (id == 1) {
+                               handlerBlocker.arriveAndBlock();
+                       }
+                       return CompletableFuture.completedFuture(new 
TestResponse(id));
+               };
 
                // send first request and wait until the handler blocks
-               CompletableFuture<TestResponse> response1;
-
-               synchronized (TestHandler.LOCK) {
-                       response1 = restClient.sendRequest(
-                               serverAddress.getHostName(),
-                               serverAddress.getPort(),
-                               new TestHeaders(),
-                               parameters,
-                               new TestRequest(1));
-                       TestHandler.LOCK.wait();
-               }
+               final CompletableFuture<TestResponse> response1 = 
sendRequestToTestHandler(new TestRequest(1));
+               handlerBlocker.awaitRequestToArrive();
 
                // send second request and verify response
-               CompletableFuture<TestResponse> response2 = 
restClient.sendRequest(
-                       serverAddress.getHostName(),
-                       serverAddress.getPort(),
-                       new TestHeaders(),
-                       parameters,
-                       new TestRequest(2));
+               final CompletableFuture<TestResponse> response2 = 
sendRequestToTestHandler(new TestRequest(2));
                assertEquals(2, response2.get().id);
 
                // wake up blocked handler
-               synchronized (TestHandler.LOCK) {
-                       TestHandler.LOCK.notifyAll();
-               }
+               handlerBlocker.unblockRequest();
+
                // verify response to first request
                assertEquals(1, response1.get().id);
        }
@@ -335,41 +329,34 @@ public void testBadHandlerRequest() throws Exception {
        }
 
        /**
-        * Tests that requests and responses larger than {@link 
#TEST_REST_MAX_CONTENT_LENGTH}
-        * are rejected by the server and client, respectively.
+        * Tests that requests larger than {@link 
#TEST_REST_MAX_CONTENT_LENGTH} are rejected.
         */
        @Test
-       public void testMaxContentLengthLimit() throws Exception {
-               final TestParameters parameters = new TestParameters();
-               parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
-               
parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
-
-               CompletableFuture<TestResponse> response;
-               response = restClient.sendRequest(
-                       serverAddress.getHostName(),
-                       serverAddress.getPort(),
-                       new TestHeaders(),
-                       parameters,
-                       new TestRequest(2, 
createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
+       public void testShouldRespectMaxContentLengthLimitForRequests() throws 
Exception {
+               testHandler.handlerBody = id -> {
+                       throw new AssertionError("Request should not arrive at 
server.");
+               };
 
                try {
-                       response.get();
+                       sendRequestToTestHandler(new TestRequest(2, 
createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH))).get();
                        fail("Expected exception not thrown");
                } catch (final ExecutionException e) {
                        final Throwable throwable = 
ExceptionUtils.stripExecutionException(e);
                        assertThat(throwable, 
instanceOf(RestClientException.class));
                        assertThat(throwable.getMessage(), containsString("Try 
to raise"));
                }
+       }
 
-               response = restClient.sendRequest(
-                       serverAddress.getHostName(),
-                       serverAddress.getPort(),
-                       new TestHeaders(),
-                       parameters,
-                       new TestRequest(TestHandler.LARGE_RESPONSE_BODY_ID));
+       /**
+        * Tests that responses larger than {@link 
#TEST_REST_MAX_CONTENT_LENGTH} are rejected.
+        */
+       @Test
+       public void testShouldRespectMaxContentLengthLimitForResponses() throws 
Exception {
+               testHandler.handlerBody = id -> 
CompletableFuture.completedFuture(
+                       new TestResponse(id, 
createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
 
                try {
-                       response.get();
+                       sendRequestToTestHandler(new TestRequest(1)).get();
                        fail("Expected exception not thrown");
                } catch (final ExecutionException e) {
                        final Throwable throwable = 
ExceptionUtils.stripExecutionException(e);
@@ -545,6 +532,43 @@ public void testNonSslRedirectForEnabledSsl() throws 
Exception {
                }
        }
 
+       /**
+        * Tests that after calling {@link RestServerEndpoint#closeAsync()}, 
the handlers are closed
+        * first, and we wait for in-flight requests to finish. As long as not 
all handlers are closed,
+        * HTTP requests should be served.
+        */
+       @Test
+       public void testShouldWaitForHandlersWhenClosing() throws Exception {
+               testHandler.closeFuture = new CompletableFuture<>();
+               final HandlerBlocker handlerBlocker = new 
HandlerBlocker(timeout);
+               testHandler.handlerBody = id -> {
+                       // Intentionally schedule the work on a different 
thread. This is to simulate
+                       // handlers where the CompletableFuture is finished by 
the RPC framework.
+                       return CompletableFuture.supplyAsync(() -> {
+                               handlerBlocker.arriveAndBlock();
+                               return new TestResponse(id);
+                       });
+               };
+
+               // Initiate closing RestServerEndpoint but the test handler 
should block.
+               final CompletableFuture<Void> closeRestServerEndpointFuture = 
serverEndpoint.closeAsync();
+               assertThat(closeRestServerEndpointFuture.isDone(), is(false));
+
+               final CompletableFuture<TestResponse> request = 
sendRequestToTestHandler(new TestRequest(1));
+               handlerBlocker.awaitRequestToArrive();
+
+               // Allow handler to close but there is still one in-flight 
request which should prevent
+               // the RestServerEndpoint from closing.
+               testHandler.closeFuture.complete(null);
+               assertThat(closeRestServerEndpointFuture.isDone(), is(false));
+
+               // Finish the in-flight request.
+               handlerBlocker.unblockRequest();
+
+               request.get(timeout.getSize(), timeout.getUnit());
+               closeRestServerEndpointFuture.get(timeout.getSize(), 
timeout.getUnit());
+       }
+
        private HttpURLConnection openHttpConnectionForUpload(final String 
boundary) throws IOException {
                final HttpURLConnection connection =
                        (HttpURLConnection) new 
URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection();
@@ -587,9 +611,9 @@ protected void startInternal() {}
 
        private static class TestHandler extends 
AbstractRestHandler<RestfulGateway, TestRequest, TestResponse, TestParameters> {
 
-               private static final Object LOCK = new Object();
+               private CompletableFuture<Void> closeFuture = 
CompletableFuture.completedFuture(null);
 
-               private static final int LARGE_RESPONSE_BODY_ID = 3;
+               private Function<Integer, CompletableFuture<TestResponse>> 
handlerBody;
 
                TestHandler(
                                CompletableFuture<String> localAddressFuture,
@@ -604,25 +628,89 @@ protected void startInternal() {}
                }
 
                @Override
-               protected CompletableFuture<TestResponse> 
handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request, 
RestfulGateway gateway) throws RestHandlerException {
+               protected CompletableFuture<TestResponse> 
handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request, 
RestfulGateway gateway) {
                        
assertEquals(request.getPathParameter(JobIDPathParameter.class), PATH_JOB_ID);
                        
assertEquals(request.getQueryParameter(JobIDQueryParameter.class).get(0), 
QUERY_JOB_ID);
 
                        final int id = request.getRequestBody().id;
-                       if (id == 1) {
-                               synchronized (LOCK) {
-                                       try {
-                                               LOCK.notifyAll();
-                                               LOCK.wait();
-                                       } catch (InterruptedException ignored) {
-                                       }
-                               }
-                       } else if (id == LARGE_RESPONSE_BODY_ID) {
-                               return CompletableFuture.completedFuture(new 
TestResponse(
-                                       id,
-                                       
createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
+                       return handlerBody.apply(id);
+               }
+
+               @Override
+               public CompletableFuture<Void> closeHandlerAsync() {
+                       return closeFuture;
+               }
+       }
+
+       private CompletableFuture<TestResponse> sendRequestToTestHandler(final 
TestRequest testRequest) {
+               try {
+                       return restClient.sendRequest(
+                               serverAddress.getHostName(),
+                               serverAddress.getPort(),
+                               new TestHeaders(),
+                               createTestParameters(),
+                               testRequest);
+               } catch (final IOException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       private static TestParameters createTestParameters() {
+               final TestParameters parameters = new TestParameters();
+               parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
+               
parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
+               return parameters;
+       }
+
+       /**
+        * This is a helper class for tests that require to have fine-grained 
control over HTTP
+        * requests so that they are not dispatched immediately.
+        */
+       private static class HandlerBlocker {
+
+               private final Time timeout;
+
+               private final CountDownLatch requestArrivedLatch = new 
CountDownLatch(1);
+
+               private final CountDownLatch finishRequestLatch = new 
CountDownLatch(1);
+
+               private HandlerBlocker(final Time timeout) {
+                       this.timeout = checkNotNull(timeout);
+               }
+
+               /**
+                * Waits until {@link #arriveAndBlock()} is called.
+                */
+               public void awaitRequestToArrive() {
+                       try {
+                               
assertTrue(requestArrivedLatch.await(timeout.getSize(), timeout.getUnit()));
+                       } catch (final InterruptedException e) {
+                               Thread.currentThread().interrupt();
                        }
-                       return CompletableFuture.completedFuture(new 
TestResponse(id));
+               }
+
+               /**
+                * Signals that the request arrived. This method blocks until 
{@link #unblockRequest()} is
+                * called.
+                */
+               public void arriveAndBlock() {
+                       markRequestArrived();
+                       try {
+                               
assertTrue(finishRequestLatch.await(timeout.getSize(), timeout.getUnit()));
+                       } catch (final InterruptedException e) {
+                               Thread.currentThread().interrupt();
+                       }
+               }
+
+               /**
+                * @see #arriveAndBlock()
+                */
+               public void unblockRequest() {
+                       finishRequestLatch.countDown();
+               }
+
+               private void markRequestArrived() {
+                       requestArrivedLatch.countDown();
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java
similarity index 96%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java
index 607c1c4bb06..ebb6656fdcd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.rest;
+package org.apache.flink.runtime.rest.handler;
 
-import org.apache.flink.runtime.rest.handler.FileUploads;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.router.RouteResult;
 import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java
new file mode 100644
index 00000000000..c486571d9cf
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link InFlightRequestTracker}.
+ */
+public class InFlightRequestTrackerTest {
+
+       private InFlightRequestTracker inFlightRequestTracker;
+
+       @Before
+       public void setUp() {
+               inFlightRequestTracker = new InFlightRequestTracker();
+       }
+
+       @Test
+       public void testShouldFinishAwaitAsyncImmediatelyIfNoRequests() {
+               assertTrue(inFlightRequestTracker.awaitAsync().isDone());
+       }
+
+       @Test
+       public void testShouldFinishAwaitAsyncIffAllRequestsDeregistered() {
+               inFlightRequestTracker.registerRequest();
+
+               final CompletableFuture<Void> awaitFuture = 
inFlightRequestTracker.awaitAsync();
+               assertFalse(awaitFuture.isDone());
+
+               inFlightRequestTracker.deregisterRequest();
+               assertTrue(awaitFuture.isDone());
+       }
+
+       @Test
+       public void testAwaitAsyncIsIdempotent() {
+               final CompletableFuture<Void> awaitFuture = 
inFlightRequestTracker.awaitAsync();
+               assertTrue(awaitFuture.isDone());
+
+               assertSame(
+                       "The reference to the future must not change",
+                       awaitFuture,
+                       inFlightRequestTracker.awaitAsync());
+       }
+
+       @Test
+       public void testShouldTolerateRegisterAfterAwaitAsync() {
+               final CompletableFuture<Void> awaitFuture = 
inFlightRequestTracker.awaitAsync();
+               assertTrue(awaitFuture.isDone());
+
+               inFlightRequestTracker.registerRequest();
+
+               assertSame(
+                       "The reference to the future must not change",
+                       awaitFuture,
+                       inFlightRequestTracker.awaitAsync());
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
index eeb41a829b8..fb384a938d1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
@@ -179,6 +179,32 @@ public void testUnknownTriggerId() throws Exception {
                }
        }
 
+       /**
+        * Tests that the future returned by {@link 
AbstractAsynchronousOperationHandlers.StatusHandler#closeAsync()}
+        * completes when the result of the asynchronous operation is served.
+        */
+       @Test
+       public void testCloseShouldFinishOnFirstServedResult() throws Exception 
{
+               final CompletableFuture<String> savepointFuture = new 
CompletableFuture<>();
+               final TestingRestfulGateway testingRestfulGateway = new 
TestingRestfulGateway.Builder()
+                       .setTriggerSavepointFunction((JobID jobId, String 
directory) -> savepointFuture)
+                       .build();
+
+               final TriggerId triggerId = testingTriggerHandler.handleRequest(
+                       triggerOperationRequest(),
+                       testingRestfulGateway).get().getTriggerId();
+               final CompletableFuture<Void> closeFuture = 
testingStatusHandler.closeAsync();
+
+               
testingStatusHandler.handleRequest(statusOperationRequest(triggerId), 
testingRestfulGateway).get();
+
+               assertThat(closeFuture.isDone(), is(false));
+
+               savepointFuture.complete("foobar");
+               
testingStatusHandler.handleRequest(statusOperationRequest(triggerId), 
testingRestfulGateway).get();
+
+               assertThat(closeFuture.isDone(), is(true));
+       }
+
        private static HandlerRequest<EmptyRequestBody, EmptyMessageParameters> 
triggerOperationRequest() throws HandlerRequestException {
                return new HandlerRequest<>(EmptyRequestBody.getInstance(), 
EmptyMessageParameters.getInstance());
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java
new file mode 100644
index 00000000000..2d13780f956
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.util.ManualTicker;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link CompletedOperationCache}.
+ */
+public class CompletedOperationCacheTest extends TestLogger {
+
+       private static final OperationKey TEST_OPERATION_KEY = new 
OperationKey(new TriggerId());
+
+       private static final CompletableFuture<String> TEST_OPERATION_RESULT = 
CompletableFuture.completedFuture("foo");
+
+       private ManualTicker manualTicker;
+
+       private CompletedOperationCache<OperationKey, String> 
completedOperationCache;
+
+       @Before
+       public void setUp() {
+               manualTicker = new ManualTicker();
+               completedOperationCache = new 
CompletedOperationCache<>(manualTicker);
+       }
+
+       @Test
+       public void testShouldFinishClosingCacheIfAllResultsAreEvicted() {
+               
completedOperationCache.registerOngoingOperation(TEST_OPERATION_KEY, 
TEST_OPERATION_RESULT);
+               final CompletableFuture<Void> closeCacheFuture = 
completedOperationCache.closeAsync();
+               assertThat(closeCacheFuture.isDone(), is(false));
+
+               manualTicker.advanceTime(300, TimeUnit.SECONDS);
+               completedOperationCache.cleanUp();
+
+               assertThat(closeCacheFuture.isDone(), is(true));
+       }
+
+       @Test
+       public void testShouldFinishClosingCacheIfAllResultsAccessed() throws 
Exception {
+               
completedOperationCache.registerOngoingOperation(TEST_OPERATION_KEY, 
TEST_OPERATION_RESULT);
+               final CompletableFuture<Void> closeCacheFuture = 
completedOperationCache.closeAsync();
+               assertThat(closeCacheFuture.isDone(), is(false));
+
+               final Either<Throwable, String> operationResultOrError = 
completedOperationCache.get(TEST_OPERATION_KEY);
+
+               assertThat(operationResultOrError, is(notNullValue()));
+               assertThat(operationResultOrError.right(), 
is(equalTo(TEST_OPERATION_RESULT.get())));
+               assertThat(closeCacheFuture.isDone(), is(true));
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ManualTicker.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ManualTicker.java
new file mode 100644
index 00000000000..d2e2e1d7bea
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ManualTicker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Controllable {@link Ticker} implementation for tests.
+ */
+public final class ManualTicker extends Ticker {
+
+       private long currentTime;
+
+       @Override
+       public long read() {
+               return currentTime;
+       }
+
+       public void advanceTime(final long duration, final TimeUnit timeUnit) {
+               currentTime += timeUnit.toNanos(duration);
+       }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index afca8f12100..c5ec1b5f914 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -33,6 +33,7 @@
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
@@ -249,7 +250,7 @@ public Long map(Long value) throws Exception {
                                clusterClient.shutdown();
                        }
                        if (dispatcherResourceManagerComponent != null) {
-                               dispatcherResourceManagerComponent.close();
+                               
dispatcherResourceManagerComponent.deregisterApplicationAndClose(ApplicationStatus.SUCCEEDED,
 null);
                        }
 
                        fatalErrorHandler.rethrowError();
diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties 
b/flink-yarn-tests/src/test/resources/log4j-test.properties
index 8f56c1fa9a5..42ae7ddf08e 100644
--- a/flink-yarn-tests/src/test/resources/log4j-test.properties
+++ b/flink-yarn-tests/src/test/resources/log4j-test.properties
@@ -35,7 +35,7 @@ log4j.logger.org.apache.flink.runtime.leaderelection=INFO
 log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO
 
 log4j.logger.org.apache.directory=OFF
-log4j.logger.org.mortbay.log=OFF, testlogger
+log4j.logger.org.mortbay.log=OFF
 log4j.logger.net.sf.ehcache=OFF
 log4j.logger.org.apache.hadoop.metrics2=OFF
 log4j.logger.org.apache.hadoop.conf.Configuration=OFF


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Cancel flink job occurs java.net.ConnectException
> -------------------------------------------------
>
>                 Key: FLINK-10309
>                 URL: https://issues.apache.org/jira/browse/FLINK-10309
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination, REST
>    Affects Versions: 1.5.3, 1.6.0, 1.7.0
>            Reporter: vinoyang
>            Assignee: Gary Yao
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The problem occurs when using the Yarn per-job detached mode. Trying to 
> cancel with savepoint fails with the following exception before being able to 
> retrieve the savepoint path:
> exception stack trace : 
> {code:java}
> org.apache.flink.util.FlinkException: Could not cancel job xxxx.
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585)
>         at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960)
>         at 
> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577)
>         at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Number of retries has been exhausted.
>         at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>         at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>         at 
> org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398)
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583)
>         ... 6 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
> Could not complete the operation. Number of retries has been exhausted.
>         at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>         at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>         ... 1 more
> Caused by: java.util.concurrent.CompletionException: 
> java.net.ConnectException: Connect refuse: xxx/xxx.xxx.xxx.xxx:xxx
>         at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>         at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>         at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
>         at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>         ... 16 more
> Caused by: java.net.ConnectException: Connect refuse: xxx/xxx.xxx.xxx.xxx:xxx
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
>         ... 7 more
> {code}
> some discussion in mailing list : 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Cancel-flink-job-occur-exception-td24056.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to