Repository: flink Updated Branches: refs/heads/master 0cf7f7666 -> d7cea586e
[FLINK-7519] Add HttpResponseStatus to RestClientException Enrich the RestClientException with the HttpResponseStatus in case of failures. This makes the exception handling on the client side easier. This closes #4588. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7cea586 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7cea586 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7cea586 Branch: refs/heads/master Commit: d7cea586ec60f89ed06baf7ab95ebcf54f42a537 Parents: 0cf7f76 Author: Till Rohrmann <trohrm...@apache.org> Authored: Fri Aug 25 12:15:38 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Sun Sep 3 23:16:22 2017 +0200 ---------------------------------------------------------------------- .../apache/flink/runtime/rest/RestClient.java | 76 +++++++++++++++----- .../rest/handler/AbstractRestHandler.java | 2 +- .../rest/handler/RestHandlerException.java | 10 +-- .../runtime/rest/util/RestClientException.java | 20 ++++-- 4 files changed, 78 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 7422ece..ea266be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -48,6 +48,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRespon import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; @@ -85,15 +87,15 @@ public class RestClient { this.executor = Preconditions.checkNotNull(executor); SSLEngine sslEngine = configuration.getSslEngine(); - ChannelInitializer initializer = new ChannelInitializer<SocketChannel>() { + ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel(SocketChannel socketChannel) throws Exception { // SSL should be the first handler in the pipeline if (sslEngine != null) { - ch.pipeline().addLast("ssl", new SslHandler(sslEngine)); + socketChannel.pipeline().addLast("ssl", new SslHandler(sslEngine)); } - ch.pipeline() + socketChannel.pipeline() .addLast(new HttpClientCodec()) .addLast(new HttpObjectAggregator(1024 * 1024)) .addLast(new ClientHandler()) @@ -150,7 +152,7 @@ public class RestClient { httpRequest.headers() .add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity()) .add(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name()) - .set(HttpHeaders.Names.HOST, targetAddress + ":" + targetPort) + .set(HttpHeaders.Names.HOST, targetAddress + ':' + targetPort) .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); return submitRequest(targetAddress, targetPort, httpRequest, messageHeaders.getResponseClass()); @@ -168,29 +170,36 @@ public class RestClient { .thenApply((ChannelFuture::channel)) .thenCompose(channel -> { ClientHandler handler = channel.pipeline().get(ClientHandler.class); - CompletableFuture<JsonNode> future = handler.getJsonFuture(); + CompletableFuture<JsonResponse> future = handler.getJsonFuture(); channel.writeAndFlush(httpRequest); - return future.thenComposeAsync(rawResponse -> parseResponse(rawResponse, responseClass), executor); - }); + return future; + }).thenComposeAsync( + (JsonResponse rawResponse) -> parseResponse(rawResponse, responseClass), + executor + ); } - private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonNode rawResponse, Class<P> responseClass) { + private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonResponse rawResponse, Class<P> responseClass) { CompletableFuture<P> responseFuture = new CompletableFuture<>(); try { - P response = objectMapper.treeToValue(rawResponse, responseClass); + P response = objectMapper.treeToValue(rawResponse.getJson(), responseClass); responseFuture.complete(response); } catch (JsonProcessingException jpe) { // the received response did not matched the expected response type // lets see if it is an ErrorResponse instead try { - ErrorResponseBody error = objectMapper.treeToValue(rawResponse, ErrorResponseBody.class); - responseFuture.completeExceptionally(new RestClientException(error.errors.toString())); + ErrorResponseBody error = objectMapper.treeToValue(rawResponse.getJson(), ErrorResponseBody.class); + responseFuture.completeExceptionally(new RestClientException(error.errors.toString(), rawResponse.getHttpResponseStatus())); } catch (JsonProcessingException jpe2) { // if this fails it is either the expected type or response type was wrong, most likely caused // by a client/search MessageHeaders mismatch LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", responseClass, rawResponse, jpe2); - responseFuture.completeExceptionally(new RestClientException("Response was neither of the expected type(" + responseClass + ") nor an error.", jpe2)); + responseFuture.completeExceptionally( + new RestClientException( + "Response was neither of the expected type(" + responseClass + ") nor an error.", + jpe2, + rawResponse.getHttpResponseStatus())); } } return responseFuture; @@ -198,9 +207,9 @@ public class RestClient { private static class ClientHandler extends SimpleChannelInboundHandler<Object> { - private final CompletableFuture<JsonNode> jsonFuture = new CompletableFuture<>(); + private final CompletableFuture<JsonResponse> jsonFuture = new CompletableFuture<>(); - CompletableFuture<JsonNode> getJsonFuture() { + CompletableFuture<JsonResponse> getJsonFuture() { return jsonFuture; } @@ -210,7 +219,18 @@ public class RestClient { readRawResponse((FullHttpResponse) msg); } else { LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse."); - jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse.")); + if (msg instanceof HttpResponse) { + jsonFuture.completeExceptionally( + new RestClientException( + "Implementation error: Received a response that wasn't a FullHttpResponse.", + ((HttpResponse) msg).getStatus())); + } else { + jsonFuture.completeExceptionally( + new RestClientException( + "Implementation error: Received a response that wasn't a FullHttpResponse.", + HttpResponseStatus.INTERNAL_SERVER_ERROR)); + } + } ctx.close(); } @@ -225,14 +245,32 @@ public class RestClient { LOG.debug("Received response {}.", rawResponse); } catch (JsonParseException je) { LOG.error("Response was not valid JSON.", je); - jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je)); + jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je, msg.getStatus())); return; } catch (IOException ioe) { LOG.error("Response could not be read.", ioe); - jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe)); + jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe, msg.getStatus())); return; } - jsonFuture.complete(rawResponse); + jsonFuture.complete(new JsonResponse(rawResponse, msg.getStatus())); + } + } + + private static final class JsonResponse { + private final JsonNode json; + private final HttpResponseStatus httpResponseStatus; + + private JsonResponse(JsonNode json, HttpResponseStatus httpResponseStatus) { + this.json = Preconditions.checkNotNull(json); + this.httpResponseStatus = Preconditions.checkNotNull(httpResponseStatus); + } + + public JsonNode getJson() { + return json; + } + + public HttpResponseStatus getHttpResponseStatus() { + return httpResponseStatus; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java index 23e2918..2f2f9aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java @@ -135,7 +135,7 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo if (error != null) { if (error instanceof RestHandlerException) { RestHandlerException rhe = (RestHandlerException) error; - sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getErrorCode(), ctx, httpRequest); + sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getHttpResponseStatus(), ctx, httpRequest); } else { log.error("Implementation error: Unhandled exception.", error); sendErrorResponse(new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest); http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java index 9285f25..4cbb542 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java @@ -27,18 +27,18 @@ public class RestHandlerException extends Exception { private static final long serialVersionUID = -1358206297964070876L; private final String errorMessage; - private final HttpResponseStatus errorCode; + private final int responseCode; - public RestHandlerException(String errorMessage, HttpResponseStatus errorCode) { + public RestHandlerException(String errorMessage, HttpResponseStatus httpResponseStatus) { this.errorMessage = errorMessage; - this.errorCode = errorCode; + this.responseCode = httpResponseStatus.code(); } public String getErrorMessage() { return errorMessage; } - public HttpResponseStatus getErrorCode() { - return errorCode; + public HttpResponseStatus getHttpResponseStatus() { + return HttpResponseStatus.valueOf(responseCode); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java index 9d86b47..2333614 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java @@ -19,6 +19,9 @@ package org.apache.flink.runtime.rest.util; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; /** * An exception that is thrown if the failure of a REST operation was detected on the client. @@ -27,15 +30,22 @@ public class RestClientException extends FlinkException { private static final long serialVersionUID = 937914622022344423L; - public RestClientException(String message) { + private final int responseCode; + + public RestClientException(String message, HttpResponseStatus responseStatus) { super(message); - } - public RestClientException(Throwable cause) { - super(cause); + Preconditions.checkNotNull(responseStatus); + responseCode = responseStatus.code(); } - public RestClientException(String message, Throwable cause) { + public RestClientException(String message, Throwable cause, HttpResponseStatus responseStatus) { super(message, cause); + + responseCode = responseStatus.code(); + } + + public HttpResponseStatus getHttpResponseStatus() { + return HttpResponseStatus.valueOf(responseCode); } }