[
https://issues.apache.org/jira/browse/FLINK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656373#comment-16656373
]
ASF GitHub Bot commented on FLINK-10309:
----------------------------------------
asfgit closed pull request #6785: [FLINK-10309][rest] Before shutting down
cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 9eaef34a33a..b2d6d150561 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -445,9 +445,7 @@ private Configuration
generateClusterConfiguration(Configuration configuration)
private CompletableFuture<Void> closeClusterComponent(ApplicationStatus
applicationStatus, @Nullable String diagnostics) {
synchronized (lock) {
if (clusterComponent != null) {
- final CompletableFuture<Void>
deregisterApplicationFuture =
clusterComponent.deregisterApplication(applicationStatus, diagnostics);
-
- return
FutureUtils.runAfterwards(deregisterApplicationFuture,
clusterComponent::closeAsync);
+ return
clusterComponent.deregisterApplicationAndClose(applicationStatus, diagnostics);
} else {
return CompletableFuture.completedFuture(null);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
index b07095c2b6d..6e28ab6c6b0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
@@ -26,7 +26,6 @@
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
-import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import javax.annotation.Nonnull;
@@ -41,7 +40,7 @@
* Component which starts a {@link Dispatcher}, {@link ResourceManager} and
{@link WebMonitorEndpoint}
* in the same process.
*/
-public class DispatcherResourceManagerComponent<T extends Dispatcher>
implements AutoCloseableAsync {
+public class DispatcherResourceManagerComponent<T extends Dispatcher> {
@Nonnull
private final T dispatcher;
@@ -126,65 +125,77 @@ public T getDispatcher() {
return webMonitorEndpoint;
}
- @Override
- public CompletableFuture<Void> closeAsync() {
+ /**
+ * Deregister the Flink application from the resource management system
by signalling
+ * the {@link ResourceManager}.
+ *
+ * @param applicationStatus to terminate the application with
+ * @param diagnostics additional information about the shut down, can
be {@code null}
+ * @return Future which is completed once the shut down
+ */
+ public CompletableFuture<Void> deregisterApplicationAndClose(
+ final ApplicationStatus applicationStatus,
+ final @Nullable String diagnostics) {
+
if (isRunning.compareAndSet(true, false)) {
- Exception exception = null;
+ final CompletableFuture<Void>
closeWebMonitorAndDeregisterAppFuture =
+
FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
deregisterApplication(applicationStatus, diagnostics));
- final Collection<CompletableFuture<Void>>
terminationFutures = new ArrayList<>(4);
+ return
FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture,
this::closeAsyncInternal);
+ } else {
+ return terminationFuture;
+ }
+ }
- try {
- dispatcherLeaderRetrievalService.stop();
- } catch (Exception e) {
- exception = ExceptionUtils.firstOrSuppressed(e,
exception);
- }
+ private CompletableFuture<Void> deregisterApplication(
+ final ApplicationStatus applicationStatus,
+ final @Nullable String diagnostics) {
- try {
- resourceManagerRetrievalService.stop();
- } catch (Exception e) {
- exception = ExceptionUtils.firstOrSuppressed(e,
exception);
- }
+ final ResourceManagerGateway selfGateway =
resourceManager.getSelfGateway(ResourceManagerGateway.class);
+ return selfGateway.deregisterApplication(applicationStatus,
diagnostics).thenApply(ack -> null);
+ }
- terminationFutures.add(webMonitorEndpoint.closeAsync());
+ private CompletableFuture<Void> closeAsyncInternal() {
+ Exception exception = null;
- dispatcher.shutDown();
-
terminationFutures.add(dispatcher.getTerminationFuture());
+ final Collection<CompletableFuture<Void>> terminationFutures =
new ArrayList<>(3);
- resourceManager.shutDown();
-
terminationFutures.add(resourceManager.getTerminationFuture());
+ try {
+ dispatcherLeaderRetrievalService.stop();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e,
exception);
+ }
- if (exception != null) {
-
terminationFutures.add(FutureUtils.completedExceptionally(exception));
- }
+ try {
+ resourceManagerRetrievalService.stop();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e,
exception);
+ }
- final CompletableFuture<Void>
componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
+ dispatcher.shutDown();
+ terminationFutures.add(dispatcher.getTerminationFuture());
- final CompletableFuture<Void>
metricGroupTerminationFuture = FutureUtils.runAfterwards(
- componentTerminationFuture,
- jobManagerMetricGroup::close);
+ resourceManager.shutDown();
+ terminationFutures.add(resourceManager.getTerminationFuture());
- metricGroupTerminationFuture.whenComplete((aVoid,
throwable) -> {
- if (throwable != null) {
-
terminationFuture.completeExceptionally(throwable);
- } else {
- terminationFuture.complete(aVoid);
- }
- });
+ if (exception != null) {
+
terminationFutures.add(FutureUtils.completedExceptionally(exception));
}
- return terminationFuture;
- }
+ final CompletableFuture<Void> componentTerminationFuture =
FutureUtils.completeAll(terminationFutures);
- /**
- * Deregister the Flink application from the resource management system
by signalling
- * the {@link ResourceManager}.
- *
- * @param applicationStatus to terminate the application with
- * @param diagnostics additional information about the shut down, can
be {@code null}
- * @return Future which is completed once the shut down
- */
- public CompletableFuture<Void> deregisterApplication(ApplicationStatus
applicationStatus, @Nullable String diagnostics) {
- final ResourceManagerGateway selfGateway =
resourceManager.getSelfGateway(ResourceManagerGateway.class);
- return selfGateway.deregisterApplication(applicationStatus,
diagnostics).thenApply(ack -> null);
+ final CompletableFuture<Void> metricGroupTerminationFuture =
FutureUtils.runAfterwards(
+ componentTerminationFuture,
+ jobManagerMetricGroup::close);
+
+ metricGroupTerminationFuture.whenComplete((aVoid, throwable) ->
{
+ if (throwable != null) {
+
terminationFuture.completeExceptionally(throwable);
+ } else {
+ terminationFuture.complete(aVoid);
+ }
+ });
+
+ return terminationFuture;
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 31c6a97124b..5d2d363cf71 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -679,7 +679,7 @@ public void acknowledgeCheckpoint(
try {
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
} catch (Throwable t) {
- log.warn("Error while processing
checkpoint acknowledgement message");
+ log.warn("Error while processing
checkpoint acknowledgement message", t);
}
});
} else {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 43636dd8f9e..068192cf794 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -63,6 +63,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
/**
* An abstract class for netty-based REST server endpoints.
@@ -85,6 +86,7 @@
private final CompletableFuture<Void> terminationFuture;
+ private List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>>
handlers;
private ServerBootstrap bootstrap;
private Channel serverChannel;
private String restBaseUrl;
@@ -131,7 +133,7 @@ public final void start() throws Exception {
final Router router = new Router();
final CompletableFuture<String> restAddressFuture = new
CompletableFuture<>();
- List<Tuple2<RestHandlerSpecification,
ChannelInboundHandler>> handlers = initializeHandlers(restAddressFuture);
+ handlers = initializeHandlers(restAddressFuture);
/* sort the handlers such that they are ordered the
following:
* /jobs
@@ -265,10 +267,13 @@ public String getRestBaseUrl() {
log.info("Shutting down rest endpoint.");
if (state == State.RUNNING) {
- final CompletableFuture<Void> shutDownFuture =
shutDownInternal();
+ final CompletableFuture<Void> shutDownFuture =
FutureUtils.composeAfterwards(
+ closeHandlersAsync(),
+ this::shutDownInternal);
shutDownFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
+ log.info("Shut down complete.");
if (throwable != null) {
terminationFuture.completeExceptionally(throwable);
} else {
@@ -285,6 +290,14 @@ public String getRestBaseUrl() {
}
}
+ private FutureUtils.ConjunctFuture<Void> closeHandlersAsync() {
+ return FutureUtils.waitForAll(handlers.stream()
+ .map(tuple -> tuple.f1)
+ .filter(handler -> handler instanceof
AutoCloseableAsync)
+ .map(handler -> ((AutoCloseableAsync)
handler).closeAsync())
+ .collect(Collectors.toList()));
+ }
+
/**
* Stops this REST server endpoint.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
similarity index 87%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 3d1ec9d0066..5a1c371d5a6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -16,14 +16,11 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.rest;
+package org.apache.flink.runtime.rest.handler;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.rest.handler.FileUploads;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.HandlerRequestException;
-import org.apache.flink.runtime.rest.handler.RedirectHandler;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.FileUploadHandler;
import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
@@ -33,6 +30,7 @@
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.Preconditions;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
@@ -64,7 +62,7 @@
* @param <R> type of the incoming request
* @param <M> type of the message parameters
*/
-public abstract class AbstractHandler<T extends RestfulGateway, R extends
RequestBody, M extends MessageParameters> extends RedirectHandler<T> {
+public abstract class AbstractHandler<T extends RestfulGateway, R extends
RequestBody, M extends MessageParameters> extends RedirectHandler<T> implements
AutoCloseableAsync {
protected final Logger log = LoggerFactory.getLogger(getClass());
@@ -72,6 +70,11 @@
private final UntypedResponseMessageHeaders<R, M>
untypedResponseMessageHeaders;
+ /**
+ * Used to ensure that the handler is not closed while there are still
in-flight requests.
+ */
+ private final InFlightRequestTracker inFlightRequestTracker;
+
protected AbstractHandler(
@Nonnull CompletableFuture<String> localAddressFuture,
@Nonnull GatewayRetriever<? extends T> leaderRetriever,
@@ -81,6 +84,7 @@ protected AbstractHandler(
super(localAddressFuture, leaderRetriever, timeout,
responseHeaders);
this.untypedResponseMessageHeaders =
Preconditions.checkNotNull(untypedResponseMessageHeaders);
+ this.inFlightRequestTracker = new InFlightRequestTracker();
}
@Override
@@ -92,6 +96,7 @@ protected void respondAsLeader(ChannelHandlerContext ctx,
RoutedRequest routedRe
FileUploads uploadedFiles = null;
try {
+ inFlightRequestTracker.registerRequest();
if (!(httpRequest instanceof FullHttpRequest)) {
// The RestServerEndpoint defines a
HttpObjectAggregator in the pipeline that always returns
// FullHttpRequests.
@@ -154,8 +159,12 @@ protected void respondAsLeader(ChannelHandlerContext ctx,
RoutedRequest routedRe
final FileUploads finalUploadedFiles = uploadedFiles;
requestProcessingFuture
- .whenComplete((Void ignored, Throwable
throwable) -> cleanupFileUploads(finalUploadedFiles));
+ .whenComplete((Void ignored, Throwable
throwable) -> {
+
inFlightRequestTracker.deregisterRequest();
+ cleanupFileUploads(finalUploadedFiles);
+ });
} catch (RestHandlerException rhe) {
+ inFlightRequestTracker.deregisterRequest();
HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
@@ -164,6 +173,7 @@ protected void respondAsLeader(ChannelHandlerContext ctx,
RoutedRequest routedRe
responseHeaders);
cleanupFileUploads(uploadedFiles);
} catch (Throwable e) {
+ inFlightRequestTracker.deregisterRequest();
log.error("Request processing failed.", e);
HandlerUtils.sendErrorResponse(
ctx,
@@ -175,6 +185,15 @@ protected void respondAsLeader(ChannelHandlerContext ctx,
RoutedRequest routedRe
}
}
+ @Override
+ public final CompletableFuture<Void> closeAsync() {
+ return FutureUtils.composeAfterwards(closeHandlerAsync(),
inFlightRequestTracker::awaitAsync);
+ }
+
+ protected CompletableFuture<Void> closeHandlerAsync() {
+ return CompletableFuture.completedFuture(null);
+ }
+
private void cleanupFileUploads(@Nullable FileUploads uploadedFiles) {
if (uploadedFiles != null) {
try {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 9cfb58e98a8..0397cb875f2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -21,7 +21,6 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.rest.AbstractHandler;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -81,11 +80,9 @@ protected AbstractRestHandler(
response = FutureUtils.completedExceptionally(e);
}
- return response.whenComplete((P resp, Throwable throwable) -> {
- Tuple2<ResponseBody, HttpResponseStatus> r = throwable
!= null ?
- errorResponse(throwable) : Tuple2.of(resp,
messageHeaders.getResponseStatusCode());
- HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1,
responseHeaders);
- }).thenApply(ignored -> null);
+ return response.handle((resp, throwable) -> throwable != null ?
+ errorResponse(throwable) : Tuple2.of(resp,
messageHeaders.getResponseStatusCode()))
+ .thenCompose(r -> HandlerUtils.sendResponse(ctx,
httpRequest, r.f0, r.f1, responseHeaders));
}
private Tuple2<ResponseBody, HttpResponseStatus>
errorResponse(Throwable throwable) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java
new file mode 100644
index 00000000000..92478b1886f
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Phaser;
+
+/**
+ * Tracks in-flight client requests.
+ *
+ * @see AbstractHandler
+ */
+@ThreadSafe
+class InFlightRequestTracker {
+
+ private final CompletableFuture<Void> terminationFuture = new
CompletableFuture<>();
+
+ private final Phaser phaser = new Phaser(1) {
+ @Override
+ protected boolean onAdvance(final int phase, final int
registeredParties) {
+ terminationFuture.complete(null);
+ return true;
+ }
+ };
+
+ /**
+ * Registers an in-flight request.
+ */
+ public void registerRequest() {
+ phaser.register();
+ }
+
+ /**
+ * Deregisters an in-flight request.
+ */
+ public void deregisterRequest() {
+ phaser.arriveAndDeregister();
+ }
+
+ /**
+ * Returns a future that completes when the in-flight requests that
were registered prior to
+ * calling this method are deregistered.
+ */
+ public CompletableFuture<Void> awaitAsync() {
+ phaser.arriveAndDeregister();
+ return terminationFuture;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
index e0c3fbec152..32b04921e42 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rest.handler.async;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rest.NotFoundException;
@@ -29,24 +28,14 @@
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.types.Either;
-import org.apache.flink.util.FlinkException;
-
-import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.ThreadSafe;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
/**
* HTTP handlers for asynchronous operations.
@@ -176,7 +165,7 @@ protected StatusHandler(
final Either<Throwable, R> operationResultOrError;
try {
operationResultOrError =
completedOperationCache.get(key);
- } catch (UnknownOperationKey e) {
+ } catch (UnknownOperationKeyException e) {
return FutureUtils.completedExceptionally(
new NotFoundException("Operation not
found under key: " + key, e));
}
@@ -194,6 +183,11 @@ protected StatusHandler(
}
}
+ @Override
+ public CompletableFuture<Void> closeHandlerAsync() {
+ return completedOperationCache.closeAsync();
+ }
+
/**
* Extract the operation key under which the operation result
future is stored.
*
@@ -220,79 +214,4 @@ protected StatusHandler(
protected abstract V operationResultResponse(R operationResult);
}
- /**
- * Cache to manage ongoing operations.
- *
- * <p>The cache allows to register ongoing operations by calling
- * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
- * {@code CompletableFuture} contains the operation result. Completed
operations will be
- * removed from the cache automatically after a fixed timeout.
- */
- @ThreadSafe
- protected static class CompletedOperationCache<K, R> {
-
- private static final long
COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;
-
- /**
- * Stores SavepointKeys of ongoing savepoint.
- * If the savepoint completes, it will be moved to {@link
#completedOperations}.
- */
- private final Set<K> registeredOperationTriggers =
ConcurrentHashMap.newKeySet();
-
- /** Caches the location of completed operations. */
- private final Cache<K, Either<Throwable, R>>
completedOperations =
- CacheBuilder.newBuilder()
-
.expireAfterWrite(COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS,
TimeUnit.SECONDS)
- .build();
-
- /**
- * Registers an ongoing operation with the cache.
- *
- * @param operationResultFuture A future containing the
operation result.
- */
- public void registerOngoingOperation(
- final K operationKey,
- final CompletableFuture<R> operationResultFuture) {
- registeredOperationTriggers.add(operationKey);
- operationResultFuture.whenComplete((savepointLocation,
error) -> {
- if (error == null) {
- completedOperations.put(operationKey,
Either.Right(savepointLocation));
- } else {
- completedOperations.put(operationKey,
Either.Left(error));
- }
-
registeredOperationTriggers.remove(operationKey);
- });
- }
-
- /**
- * Returns the operation result or a {@code Throwable} if the
{@code CompletableFuture}
- * finished, otherwise {@code null}.
- *
- * @throws UnknownOperationKey If the operation is not found,
and there is no ongoing
- * operation under the
provided key.
- */
- @Nullable
- public Either<Throwable, R> get(
- final K operationKey) throws UnknownOperationKey {
- Either<Throwable, R> operationResultOrError = null;
- if (!registeredOperationTriggers.contains(operationKey)
- && (operationResultOrError =
completedOperations.getIfPresent(operationKey)) == null) {
- throw new UnknownOperationKey(operationKey);
- }
- return operationResultOrError;
- }
- }
-
- /**
- * Exception that indicates that there is no ongoing or completed
savepoint for a given
- * {@link JobID} and {@link TriggerId} pair.
- */
- static class UnknownOperationKey extends FlinkException {
- private static final long serialVersionUID = 1L;
-
- UnknownOperationKey(final Object operationKey) {
- super("No ongoing operation for " + operationKey);
- }
- }
-
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
new file mode 100644
index 00000000000..95bb2239fb2
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Cache to manage ongoing operations.
+ *
+ * <p>The cache allows to register ongoing operations by calling
+ * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
+ * {@code CompletableFuture} contains the operation result. Completed
operations will be
+ * removed from the cache automatically after a fixed timeout.
+ */
+@ThreadSafe
+class CompletedOperationCache<K extends OperationKey, R> implements
AutoCloseableAsync {
+
+ private static final long
COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CompletedOperationCache.class);
+
+ /**
+ * In-progress asynchronous operations.
+ */
+ private final Map<K, ResultAccessTracker<R>>
registeredOperationTriggers = new ConcurrentHashMap<>();
+
+ /**
+ * Caches the result of completed operations.
+ */
+ private final Cache<K, ResultAccessTracker<R>> completedOperations;
+
+ CompletedOperationCache() {
+ this(Ticker.systemTicker());
+ }
+
+ @VisibleForTesting
+ CompletedOperationCache(final Ticker ticker) {
+ completedOperations = CacheBuilder.newBuilder()
+
.expireAfterWrite(COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS,
TimeUnit.SECONDS)
+ .removalListener((RemovalListener<K,
ResultAccessTracker<R>>) removalNotification -> {
+ if (removalNotification.wasEvicted()) {
+
Preconditions.checkState(removalNotification.getKey() != null);
+
Preconditions.checkState(removalNotification.getValue() != null);
+
+ // When shutting down the cache, we
wait until all results are accessed.
+ // When a result gets evicted from the
cache, it will not be possible to access
+ // it any longer, and we might be in
the process of shutting down, so we mark
+ // the result as accessed to avoid
waiting indefinitely.
+
removalNotification.getValue().markAccessed();
+
+ LOGGER.info("Evicted result with
trigger id {} because its TTL of {}s has expired.",
+
removalNotification.getKey().getTriggerId(),
+
COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS);
+ }
+ })
+ .ticker(ticker)
+ .build();
+ }
+
+ /**
+ * Registers an ongoing operation with the cache.
+ *
+ * @param operationResultFuture A future containing the operation
result.
+ */
+ public void registerOngoingOperation(
+ final K operationKey,
+ final CompletableFuture<R> operationResultFuture) {
+ final ResultAccessTracker<R> inProgress =
ResultAccessTracker.inProgress();
+ registeredOperationTriggers.put(operationKey, inProgress);
+ operationResultFuture.whenComplete((result, error) -> {
+ if (error == null) {
+ completedOperations.put(operationKey,
inProgress.finishOperation(Either.Right(result)));
+ } else {
+ completedOperations.put(operationKey,
inProgress.finishOperation(Either.Left(error)));
+ }
+ registeredOperationTriggers.remove(operationKey);
+ });
+ }
+
+ /**
+ * Returns the operation result or a {@code Throwable} if the {@code
CompletableFuture}
+ * finished, otherwise {@code null}.
+ *
+ * @throws UnknownOperationKeyException If the operation is not found,
and there is no ongoing
+ * operation under the provided
key.
+ */
+ @Nullable
+ public Either<Throwable, R> get(
+ final K operationKey) throws
UnknownOperationKeyException {
+ ResultAccessTracker<R> resultAccessTracker;
+ if ((resultAccessTracker =
registeredOperationTriggers.get(operationKey)) == null
+ && (resultAccessTracker =
completedOperations.getIfPresent(operationKey)) == null) {
+ throw new UnknownOperationKeyException(operationKey);
+ }
+
+ return resultAccessTracker.accessOperationResultOrError();
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return FutureUtils.orTimeout(
+ asyncWaitForResultsToBeAccessed(),
+ COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS,
+ TimeUnit.SECONDS);
+ }
+
+ private CompletableFuture<Void> asyncWaitForResultsToBeAccessed() {
+ return FutureUtils.waitForAll(
+
Stream.concat(registeredOperationTriggers.values().stream(),
completedOperations.asMap().values().stream())
+ .map(ResultAccessTracker::getAccessedFuture)
+ .collect(Collectors.toList()));
+ }
+
+ @VisibleForTesting
+ void cleanUp() {
+ completedOperations.cleanUp();
+ }
+
+ /**
+ * Stores the result of an asynchronous operation, and tracks accesses
to it.
+ */
+ private static class ResultAccessTracker<R> {
+
+ /** Result of an asynchronous operation. Null if operation is
in progress. */
+ @Nullable
+ private final Either<Throwable, R> operationResultOrError;
+
+ /** Future that completes if a non-null {@link
#operationResultOrError} is accessed. */
+ private final CompletableFuture<Void> accessed;
+
+ private static <R> ResultAccessTracker<R> inProgress() {
+ return new ResultAccessTracker<>();
+ }
+
+ private ResultAccessTracker() {
+ this.operationResultOrError = null;
+ this.accessed = new CompletableFuture<>();
+ }
+
+ private ResultAccessTracker(final Either<Throwable, R>
operationResultOrError, final CompletableFuture<Void> accessed) {
+ this.operationResultOrError =
checkNotNull(operationResultOrError);
+ this.accessed = checkNotNull(accessed);
+ }
+
+ /**
+ * Creates a new instance of the tracker with the result of the
asynchronous operation set.
+ */
+ public ResultAccessTracker<R> finishOperation(final
Either<Throwable, R> operationResultOrError) {
+ checkState(this.operationResultOrError == null);
+
+ return new
ResultAccessTracker<>(checkNotNull(operationResultOrError), this.accessed);
+ }
+
+ /**
+ * If present, returns the result of the asynchronous
operation, and marks the result as
+ * accessed. If the result is not present, this method returns
null.
+ */
+ @Nullable
+ public Either<Throwable, R> accessOperationResultOrError() {
+ if (operationResultOrError != null) {
+ markAccessed();
+ }
+ return operationResultOrError;
+ }
+
+ public CompletableFuture<Void> getAccessedFuture() {
+ return accessed;
+ }
+
+ private void markAccessed() {
+ accessed.complete(null);
+ }
+
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/UnknownOperationKeyException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/UnknownOperationKeyException.java
new file mode 100644
index 00000000000..962b80fe5e6
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/UnknownOperationKeyException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception that indicates that there is no ongoing or completed savepoint
for a given
+ * {@link JobID} and {@link TriggerId} pair.
+ */
+class UnknownOperationKeyException extends FlinkException {
+ private static final long serialVersionUID = 1L;
+
+ UnknownOperationKeyException(final Object operationKey) {
+ super("No ongoing operation for " + operationKey);
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
index 4bb473e9ceb..e773702274f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
@@ -33,7 +33,7 @@
* A pair of {@link JobID} and {@link TriggerId} used as a key to a hash based
* collection.
*
- * @see AbstractAsynchronousOperationHandlers.CompletedOperationCache
+ * @see AbstractAsynchronousOperationHandlers
*/
@Immutable
public class AsynchronousJobOperationKey extends OperationKey {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 4c7ac9406b1..8a20868ce37 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -24,7 +24,7 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
-import org.apache.flink.runtime.rest.AbstractHandler;
+import org.apache.flink.runtime.rest.handler.AbstractHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
index b407ada46e6..b60afbb5948 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
@@ -45,6 +45,7 @@
import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import static
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
@@ -69,7 +70,7 @@
* @param headers additional header values
* @param <P> type of the response
*/
- public static <P extends ResponseBody> void sendResponse(
+ public static <P extends ResponseBody> CompletableFuture<Void>
sendResponse(
ChannelHandlerContext channelHandlerContext,
HttpRequest httpRequest,
P response,
@@ -80,15 +81,14 @@
mapper.writeValue(sw, response);
} catch (IOException ioe) {
LOG.error("Internal server error. Could not map
response to JSON.", ioe);
- sendErrorResponse(
+ return sendErrorResponse(
channelHandlerContext,
httpRequest,
new ErrorResponseBody("Internal server error.
Could not map response to JSON."),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
headers);
- return;
}
- sendResponse(
+ return sendResponse(
channelHandlerContext,
httpRequest,
sw.toString(),
@@ -105,14 +105,14 @@
* @param statusCode of the message to send
* @param headers additional header values
*/
- public static void sendErrorResponse(
+ public static CompletableFuture<Void> sendErrorResponse(
ChannelHandlerContext channelHandlerContext,
HttpRequest httpRequest,
ErrorResponseBody errorMessage,
HttpResponseStatus statusCode,
Map<String, String> headers) {
- sendErrorResponse(
+ return sendErrorResponse(
channelHandlerContext,
HttpHeaders.isKeepAlive(httpRequest),
errorMessage,
@@ -129,7 +129,7 @@ public static void sendErrorResponse(
* @param statusCode of the message to send
* @param headers additional header values
*/
- public static void sendErrorResponse(
+ public static CompletableFuture<Void> sendErrorResponse(
ChannelHandlerContext channelHandlerContext,
boolean keepAlive,
ErrorResponseBody errorMessage,
@@ -142,14 +142,14 @@ public static void sendErrorResponse(
} catch (IOException e) {
// this should never happen
LOG.error("Internal server error. Could not map error
response to JSON.", e);
- sendResponse(
+ return sendResponse(
channelHandlerContext,
keepAlive,
"Internal server error. Could not map error
response to JSON.",
HttpResponseStatus.INTERNAL_SERVER_ERROR,
headers);
}
- sendResponse(
+ return sendResponse(
channelHandlerContext,
keepAlive,
sw.toString(),
@@ -166,14 +166,14 @@ public static void sendErrorResponse(
* @param statusCode of the message to send
* @param headers additional header values
*/
- public static void sendResponse(
+ public static CompletableFuture<Void> sendResponse(
@Nonnull ChannelHandlerContext channelHandlerContext,
@Nonnull HttpRequest httpRequest,
@Nonnull String message,
@Nonnull HttpResponseStatus statusCode,
@Nonnull Map<String, String> headers) {
- sendResponse(
+ return sendResponse(
channelHandlerContext,
HttpHeaders.isKeepAlive(httpRequest),
message,
@@ -190,7 +190,7 @@ public static void sendResponse(
* @param statusCode of the message to send
* @param headers additional header values
*/
- public static void sendResponse(
+ public static CompletableFuture<Void> sendResponse(
@Nonnull ChannelHandlerContext channelHandlerContext,
boolean keepAlive,
@Nonnull String message,
@@ -223,5 +223,19 @@ public static void sendResponse(
if (!keepAlive) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
+
+ return toCompletableFuture(lastContentFuture);
+ }
+
+ private static CompletableFuture<Void> toCompletableFuture(final
ChannelFuture channelFuture) {
+ final CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
+ channelFuture.addListener(future -> {
+ if (future.isSuccess()) {
+ completableFuture.complete(null);
+ } else {
+
completableFuture.completeExceptionally(future.cause());
+ }
+ });
+ return completableFuture;
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
index c289634bc07..cc116dd7714 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
@@ -27,6 +27,7 @@
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ManualTicker;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
@@ -281,20 +282,6 @@ private FileArchivedExecutionGraphStore
createDefaultExecutionGraphStore(File st
Ticker.systemTicker());
}
- private static final class ManualTicker extends Ticker {
-
- private long currentTime = 0;
-
- @Override
- public long read() {
- return currentTime;
- }
-
- void advanceTime(long duration, TimeUnit timeUnit) {
- currentTime += timeUnit.toNanos(duration);
- }
- }
-
private static final class PartialArchivedExecutionGraphMatcher extends
BaseMatcher<ArchivedExecutionGraph> {
private final ArchivedExecutionGraph archivedExecutionGraph;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 430bfad38fd..0c28745518e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -91,14 +91,18 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -131,6 +135,8 @@
private SSLContext defaultSSLContext;
private SSLSocketFactory defaultSSLSocketFactory;
+ private TestHandler testHandler;
+
public RestServerEndpointITCase(final Configuration config) {
this.config = requireNonNull(config);
}
@@ -194,7 +200,7 @@ public void setup() throws Exception {
final GatewayRetriever<RestfulGateway> mockGatewayRetriever =
() ->
CompletableFuture.completedFuture(mockRestfulGateway);
- TestHandler testHandler = new TestHandler(
+ testHandler = new TestHandler(
CompletableFuture.completedFuture(restAddress),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT);
@@ -253,7 +259,7 @@ public void teardown() throws Exception {
}
if (serverEndpoint != null) {
- serverEndpoint.close();
+ serverEndpoint.closeAsync().get(timeout.getSize(),
timeout.getUnit());
serverEndpoint = null;
}
}
@@ -264,37 +270,25 @@ public void teardown() throws Exception {
*/
@Test
public void testRequestInterleaving() throws Exception {
-
- TestParameters parameters = new TestParameters();
- parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
-
parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
+ final HandlerBlocker handlerBlocker = new
HandlerBlocker(timeout);
+ testHandler.handlerBody = id -> {
+ if (id == 1) {
+ handlerBlocker.arriveAndBlock();
+ }
+ return CompletableFuture.completedFuture(new
TestResponse(id));
+ };
// send first request and wait until the handler blocks
- CompletableFuture<TestResponse> response1;
-
- synchronized (TestHandler.LOCK) {
- response1 = restClient.sendRequest(
- serverAddress.getHostName(),
- serverAddress.getPort(),
- new TestHeaders(),
- parameters,
- new TestRequest(1));
- TestHandler.LOCK.wait();
- }
+ final CompletableFuture<TestResponse> response1 =
sendRequestToTestHandler(new TestRequest(1));
+ handlerBlocker.awaitRequestToArrive();
// send second request and verify response
- CompletableFuture<TestResponse> response2 =
restClient.sendRequest(
- serverAddress.getHostName(),
- serverAddress.getPort(),
- new TestHeaders(),
- parameters,
- new TestRequest(2));
+ final CompletableFuture<TestResponse> response2 =
sendRequestToTestHandler(new TestRequest(2));
assertEquals(2, response2.get().id);
// wake up blocked handler
- synchronized (TestHandler.LOCK) {
- TestHandler.LOCK.notifyAll();
- }
+ handlerBlocker.unblockRequest();
+
// verify response to first request
assertEquals(1, response1.get().id);
}
@@ -335,41 +329,34 @@ public void testBadHandlerRequest() throws Exception {
}
/**
- * Tests that requests and responses larger than {@link
#TEST_REST_MAX_CONTENT_LENGTH}
- * are rejected by the server and client, respectively.
+ * Tests that requests larger than {@link
#TEST_REST_MAX_CONTENT_LENGTH} are rejected.
*/
@Test
- public void testMaxContentLengthLimit() throws Exception {
- final TestParameters parameters = new TestParameters();
- parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
-
parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
-
- CompletableFuture<TestResponse> response;
- response = restClient.sendRequest(
- serverAddress.getHostName(),
- serverAddress.getPort(),
- new TestHeaders(),
- parameters,
- new TestRequest(2,
createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
+ public void testShouldRespectMaxContentLengthLimitForRequests() throws
Exception {
+ testHandler.handlerBody = id -> {
+ throw new AssertionError("Request should not arrive at
server.");
+ };
try {
- response.get();
+ sendRequestToTestHandler(new TestRequest(2,
createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH))).get();
fail("Expected exception not thrown");
} catch (final ExecutionException e) {
final Throwable throwable =
ExceptionUtils.stripExecutionException(e);
assertThat(throwable,
instanceOf(RestClientException.class));
assertThat(throwable.getMessage(), containsString("Try
to raise"));
}
+ }
- response = restClient.sendRequest(
- serverAddress.getHostName(),
- serverAddress.getPort(),
- new TestHeaders(),
- parameters,
- new TestRequest(TestHandler.LARGE_RESPONSE_BODY_ID));
+ /**
+ * Tests that responses larger than {@link
#TEST_REST_MAX_CONTENT_LENGTH} are rejected.
+ */
+ @Test
+ public void testShouldRespectMaxContentLengthLimitForResponses() throws
Exception {
+ testHandler.handlerBody = id ->
CompletableFuture.completedFuture(
+ new TestResponse(id,
createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
try {
- response.get();
+ sendRequestToTestHandler(new TestRequest(1)).get();
fail("Expected exception not thrown");
} catch (final ExecutionException e) {
final Throwable throwable =
ExceptionUtils.stripExecutionException(e);
@@ -545,6 +532,43 @@ public void testNonSslRedirectForEnabledSsl() throws
Exception {
}
}
+ /**
+ * Tests that after calling {@link RestServerEndpoint#closeAsync()},
the handlers are closed
+ * first, and we wait for in-flight requests to finish. As long as not
all handlers are closed,
+ * HTTP requests should be served.
+ */
+ @Test
+ public void testShouldWaitForHandlersWhenClosing() throws Exception {
+ testHandler.closeFuture = new CompletableFuture<>();
+ final HandlerBlocker handlerBlocker = new
HandlerBlocker(timeout);
+ testHandler.handlerBody = id -> {
+ // Intentionally schedule the work on a different
thread. This is to simulate
+ // handlers where the CompletableFuture is finished by
the RPC framework.
+ return CompletableFuture.supplyAsync(() -> {
+ handlerBlocker.arriveAndBlock();
+ return new TestResponse(id);
+ });
+ };
+
+ // Initiate closing RestServerEndpoint but the test handler
should block.
+ final CompletableFuture<Void> closeRestServerEndpointFuture =
serverEndpoint.closeAsync();
+ assertThat(closeRestServerEndpointFuture.isDone(), is(false));
+
+ final CompletableFuture<TestResponse> request =
sendRequestToTestHandler(new TestRequest(1));
+ handlerBlocker.awaitRequestToArrive();
+
+ // Allow handler to close but there is still one in-flight
request which should prevent
+ // the RestServerEndpoint from closing.
+ testHandler.closeFuture.complete(null);
+ assertThat(closeRestServerEndpointFuture.isDone(), is(false));
+
+ // Finish the in-flight request.
+ handlerBlocker.unblockRequest();
+
+ request.get(timeout.getSize(), timeout.getUnit());
+ closeRestServerEndpointFuture.get(timeout.getSize(),
timeout.getUnit());
+ }
+
private HttpURLConnection openHttpConnectionForUpload(final String
boundary) throws IOException {
final HttpURLConnection connection =
(HttpURLConnection) new
URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection();
@@ -587,9 +611,9 @@ protected void startInternal() {}
private static class TestHandler extends
AbstractRestHandler<RestfulGateway, TestRequest, TestResponse, TestParameters> {
- private static final Object LOCK = new Object();
+ private CompletableFuture<Void> closeFuture =
CompletableFuture.completedFuture(null);
- private static final int LARGE_RESPONSE_BODY_ID = 3;
+ private Function<Integer, CompletableFuture<TestResponse>>
handlerBody;
TestHandler(
CompletableFuture<String> localAddressFuture,
@@ -604,25 +628,89 @@ protected void startInternal() {}
}
@Override
- protected CompletableFuture<TestResponse>
handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request,
RestfulGateway gateway) throws RestHandlerException {
+ protected CompletableFuture<TestResponse>
handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request,
RestfulGateway gateway) {
assertEquals(request.getPathParameter(JobIDPathParameter.class), PATH_JOB_ID);
assertEquals(request.getQueryParameter(JobIDQueryParameter.class).get(0),
QUERY_JOB_ID);
final int id = request.getRequestBody().id;
- if (id == 1) {
- synchronized (LOCK) {
- try {
- LOCK.notifyAll();
- LOCK.wait();
- } catch (InterruptedException ignored) {
- }
- }
- } else if (id == LARGE_RESPONSE_BODY_ID) {
- return CompletableFuture.completedFuture(new
TestResponse(
- id,
-
createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
+ return handlerBody.apply(id);
+ }
+
+ @Override
+ public CompletableFuture<Void> closeHandlerAsync() {
+ return closeFuture;
+ }
+ }
+
+ private CompletableFuture<TestResponse> sendRequestToTestHandler(final
TestRequest testRequest) {
+ try {
+ return restClient.sendRequest(
+ serverAddress.getHostName(),
+ serverAddress.getPort(),
+ new TestHeaders(),
+ createTestParameters(),
+ testRequest);
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static TestParameters createTestParameters() {
+ final TestParameters parameters = new TestParameters();
+ parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
+
parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
+ return parameters;
+ }
+
+ /**
+ * This is a helper class for tests that require to have fine-grained
control over HTTP
+ * requests so that they are not dispatched immediately.
+ */
+ private static class HandlerBlocker {
+
+ private final Time timeout;
+
+ private final CountDownLatch requestArrivedLatch = new
CountDownLatch(1);
+
+ private final CountDownLatch finishRequestLatch = new
CountDownLatch(1);
+
+ private HandlerBlocker(final Time timeout) {
+ this.timeout = checkNotNull(timeout);
+ }
+
+ /**
+ * Waits until {@link #arriveAndBlock()} is called.
+ */
+ public void awaitRequestToArrive() {
+ try {
+
assertTrue(requestArrivedLatch.await(timeout.getSize(), timeout.getUnit()));
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
}
- return CompletableFuture.completedFuture(new
TestResponse(id));
+ }
+
+ /**
+ * Signals that the request arrived. This method blocks until
{@link #unblockRequest()} is
+ * called.
+ */
+ public void arriveAndBlock() {
+ markRequestArrived();
+ try {
+
assertTrue(finishRequestLatch.await(timeout.getSize(), timeout.getUnit()));
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * @see #arriveAndBlock()
+ */
+ public void unblockRequest() {
+ finishRequestLatch.countDown();
+ }
+
+ private void markRequestArrived() {
+ requestArrivedLatch.countDown();
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java
similarity index 96%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java
index 607c1c4bb06..ebb6656fdcd 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java
@@ -16,11 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.rest;
+package org.apache.flink.runtime.rest.handler;
-import org.apache.flink.runtime.rest.handler.FileUploads;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.router.RouteResult;
import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java
new file mode 100644
index 00000000000..c486571d9cf
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link InFlightRequestTracker}.
+ */
+public class InFlightRequestTrackerTest {
+
+ private InFlightRequestTracker inFlightRequestTracker;
+
+ @Before
+ public void setUp() {
+ inFlightRequestTracker = new InFlightRequestTracker();
+ }
+
+ @Test
+ public void testShouldFinishAwaitAsyncImmediatelyIfNoRequests() {
+ assertTrue(inFlightRequestTracker.awaitAsync().isDone());
+ }
+
+ @Test
+ public void testShouldFinishAwaitAsyncIffAllRequestsDeregistered() {
+ inFlightRequestTracker.registerRequest();
+
+ final CompletableFuture<Void> awaitFuture =
inFlightRequestTracker.awaitAsync();
+ assertFalse(awaitFuture.isDone());
+
+ inFlightRequestTracker.deregisterRequest();
+ assertTrue(awaitFuture.isDone());
+ }
+
+ @Test
+ public void testAwaitAsyncIsIdempotent() {
+ final CompletableFuture<Void> awaitFuture =
inFlightRequestTracker.awaitAsync();
+ assertTrue(awaitFuture.isDone());
+
+ assertSame(
+ "The reference to the future must not change",
+ awaitFuture,
+ inFlightRequestTracker.awaitAsync());
+ }
+
+ @Test
+ public void testShouldTolerateRegisterAfterAwaitAsync() {
+ final CompletableFuture<Void> awaitFuture =
inFlightRequestTracker.awaitAsync();
+ assertTrue(awaitFuture.isDone());
+
+ inFlightRequestTracker.registerRequest();
+
+ assertSame(
+ "The reference to the future must not change",
+ awaitFuture,
+ inFlightRequestTracker.awaitAsync());
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
index eeb41a829b8..fb384a938d1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
@@ -179,6 +179,32 @@ public void testUnknownTriggerId() throws Exception {
}
}
+ /**
+ * Tests that the future returned by {@link
AbstractAsynchronousOperationHandlers.StatusHandler#closeAsync()}
+ * completes when the result of the asynchronous operation is served.
+ */
+ @Test
+ public void testCloseShouldFinishOnFirstServedResult() throws Exception
{
+ final CompletableFuture<String> savepointFuture = new
CompletableFuture<>();
+ final TestingRestfulGateway testingRestfulGateway = new
TestingRestfulGateway.Builder()
+ .setTriggerSavepointFunction((JobID jobId, String
directory) -> savepointFuture)
+ .build();
+
+ final TriggerId triggerId = testingTriggerHandler.handleRequest(
+ triggerOperationRequest(),
+ testingRestfulGateway).get().getTriggerId();
+ final CompletableFuture<Void> closeFuture =
testingStatusHandler.closeAsync();
+
+
testingStatusHandler.handleRequest(statusOperationRequest(triggerId),
testingRestfulGateway).get();
+
+ assertThat(closeFuture.isDone(), is(false));
+
+ savepointFuture.complete("foobar");
+
testingStatusHandler.handleRequest(statusOperationRequest(triggerId),
testingRestfulGateway).get();
+
+ assertThat(closeFuture.isDone(), is(true));
+ }
+
private static HandlerRequest<EmptyRequestBody, EmptyMessageParameters>
triggerOperationRequest() throws HandlerRequestException {
return new HandlerRequest<>(EmptyRequestBody.getInstance(),
EmptyMessageParameters.getInstance());
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java
new file mode 100644
index 00000000000..2d13780f956
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.util.ManualTicker;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link CompletedOperationCache}.
+ */
+public class CompletedOperationCacheTest extends TestLogger {
+
+ private static final OperationKey TEST_OPERATION_KEY = new
OperationKey(new TriggerId());
+
+ private static final CompletableFuture<String> TEST_OPERATION_RESULT =
CompletableFuture.completedFuture("foo");
+
+ private ManualTicker manualTicker;
+
+ private CompletedOperationCache<OperationKey, String>
completedOperationCache;
+
+ @Before
+ public void setUp() {
+ manualTicker = new ManualTicker();
+ completedOperationCache = new
CompletedOperationCache<>(manualTicker);
+ }
+
+ @Test
+ public void testShouldFinishClosingCacheIfAllResultsAreEvicted() {
+
completedOperationCache.registerOngoingOperation(TEST_OPERATION_KEY,
TEST_OPERATION_RESULT);
+ final CompletableFuture<Void> closeCacheFuture =
completedOperationCache.closeAsync();
+ assertThat(closeCacheFuture.isDone(), is(false));
+
+ manualTicker.advanceTime(300, TimeUnit.SECONDS);
+ completedOperationCache.cleanUp();
+
+ assertThat(closeCacheFuture.isDone(), is(true));
+ }
+
+ @Test
+ public void testShouldFinishClosingCacheIfAllResultsAccessed() throws
Exception {
+
completedOperationCache.registerOngoingOperation(TEST_OPERATION_KEY,
TEST_OPERATION_RESULT);
+ final CompletableFuture<Void> closeCacheFuture =
completedOperationCache.closeAsync();
+ assertThat(closeCacheFuture.isDone(), is(false));
+
+ final Either<Throwable, String> operationResultOrError =
completedOperationCache.get(TEST_OPERATION_KEY);
+
+ assertThat(operationResultOrError, is(notNullValue()));
+ assertThat(operationResultOrError.right(),
is(equalTo(TEST_OPERATION_RESULT.get())));
+ assertThat(closeCacheFuture.isDone(), is(true));
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ManualTicker.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ManualTicker.java
new file mode 100644
index 00000000000..d2e2e1d7bea
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ManualTicker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Controllable {@link Ticker} implementation for tests.
+ */
+public final class ManualTicker extends Ticker {
+
+ private long currentTime;
+
+ @Override
+ public long read() {
+ return currentTime;
+ }
+
+ public void advanceTime(final long duration, final TimeUnit timeUnit) {
+ currentTime += timeUnit.toNanos(duration);
+ }
+}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index afca8f12100..c5ec1b5f914 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -33,6 +33,7 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
@@ -249,7 +250,7 @@ public Long map(Long value) throws Exception {
clusterClient.shutdown();
}
if (dispatcherResourceManagerComponent != null) {
- dispatcherResourceManagerComponent.close();
+
dispatcherResourceManagerComponent.deregisterApplicationAndClose(ApplicationStatus.SUCCEEDED,
null);
}
fatalErrorHandler.rethrowError();
diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties
b/flink-yarn-tests/src/test/resources/log4j-test.properties
index 8f56c1fa9a5..42ae7ddf08e 100644
--- a/flink-yarn-tests/src/test/resources/log4j-test.properties
+++ b/flink-yarn-tests/src/test/resources/log4j-test.properties
@@ -35,7 +35,7 @@ log4j.logger.org.apache.flink.runtime.leaderelection=INFO
log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO
log4j.logger.org.apache.directory=OFF
-log4j.logger.org.mortbay.log=OFF, testlogger
+log4j.logger.org.mortbay.log=OFF
log4j.logger.net.sf.ehcache=OFF
log4j.logger.org.apache.hadoop.metrics2=OFF
log4j.logger.org.apache.hadoop.conf.Configuration=OFF
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Cancel flink job occurs java.net.ConnectException
> -------------------------------------------------
>
> Key: FLINK-10309
> URL: https://issues.apache.org/jira/browse/FLINK-10309
> Project: Flink
> Issue Type: Bug
> Components: Distributed Coordination, REST
> Affects Versions: 1.5.3, 1.6.0, 1.7.0
> Reporter: vinoyang
> Assignee: Gary Yao
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The problem occurs when using the Yarn per-job detached mode. Trying to
> cancel with savepoint fails with the following exception before being able to
> retrieve the savepoint path:
> exception stack trace :
> {code:java}
> org.apache.flink.util.FlinkException: Could not cancel job xxxx.
> at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960)
> at
> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Number of retries has been exhausted.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583)
> ... 6 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been exhausted.
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
> ... 1 more
> Caused by: java.util.concurrent.CompletionException:
> java.net.ConnectException: Connect refuse: xxx/xxx.xxx.xxx.xxx:xxx
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> ... 16 more
> Caused by: java.net.ConnectException: Connect refuse: xxx/xxx.xxx.xxx.xxx:xxx
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
> ... 7 more
> {code}
> some discussion in mailing list :
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Cancel-flink-job-occur-exception-td24056.html
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)