Repository: flink
Updated Branches:
  refs/heads/master 0cf7f7666 -> d7cea586e


[FLINK-7519] Add HttpResponseStatus to RestClientException

Enrich the RestClientException with the HttpResponseStatus in case of failures. 
This
makes the exception handling on the client side easier.

This closes #4588.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7cea586
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7cea586
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7cea586

Branch: refs/heads/master
Commit: d7cea586ec60f89ed06baf7ab95ebcf54f42a537
Parents: 0cf7f76
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Fri Aug 25 12:15:38 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Sun Sep 3 23:16:22 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rest/RestClient.java   | 76 +++++++++++++++-----
 .../rest/handler/AbstractRestHandler.java       |  2 +-
 .../rest/handler/RestHandlerException.java      | 10 +--
 .../runtime/rest/util/RestClientException.java  | 20 ++++--
 4 files changed, 78 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 7422ece..ea266be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -48,6 +48,8 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRespon
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 
@@ -85,15 +87,15 @@ public class RestClient {
                this.executor = Preconditions.checkNotNull(executor);
 
                SSLEngine sslEngine = configuration.getSslEngine();
-               ChannelInitializer initializer = new 
ChannelInitializer<SocketChannel>() {
+               ChannelInitializer<SocketChannel> initializer = new 
ChannelInitializer<SocketChannel>() {
                        @Override
-                       protected void initChannel(SocketChannel ch) throws 
Exception {
+                       protected void initChannel(SocketChannel socketChannel) 
throws Exception {
                                // SSL should be the first handler in the 
pipeline
                                if (sslEngine != null) {
-                                       ch.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
+                                       socketChannel.pipeline().addLast("ssl", 
new SslHandler(sslEngine));
                                }
 
-                               ch.pipeline()
+                               socketChannel.pipeline()
                                        .addLast(new HttpClientCodec())
                                        .addLast(new HttpObjectAggregator(1024 
* 1024))
                                        .addLast(new ClientHandler())
@@ -150,7 +152,7 @@ public class RestClient {
                httpRequest.headers()
                        .add(HttpHeaders.Names.CONTENT_LENGTH, 
payload.capacity())
                        .add(HttpHeaders.Names.CONTENT_TYPE, "application/json; 
charset=" + ConfigConstants.DEFAULT_CHARSET.name())
-                       .set(HttpHeaders.Names.HOST, targetAddress + ":" + 
targetPort)
+                       .set(HttpHeaders.Names.HOST, targetAddress + ':' + 
targetPort)
                        .set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
 
                return submitRequest(targetAddress, targetPort, httpRequest, 
messageHeaders.getResponseClass());
@@ -168,29 +170,36 @@ public class RestClient {
                        .thenApply((ChannelFuture::channel))
                        .thenCompose(channel -> {
                                ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
-                               CompletableFuture<JsonNode> future = 
handler.getJsonFuture();
+                               CompletableFuture<JsonResponse> future = 
handler.getJsonFuture();
                                channel.writeAndFlush(httpRequest);
-                               return future.thenComposeAsync(rawResponse -> 
parseResponse(rawResponse, responseClass), executor);
-                       });
+                               return future;
+                       }).thenComposeAsync(
+                               (JsonResponse rawResponse) -> 
parseResponse(rawResponse, responseClass),
+                               executor
+                       );
        }
 
-       private static <P extends ResponseBody> CompletableFuture<P> 
parseResponse(JsonNode rawResponse, Class<P> responseClass) {
+       private static <P extends ResponseBody> CompletableFuture<P> 
parseResponse(JsonResponse rawResponse, Class<P> responseClass) {
                CompletableFuture<P> responseFuture = new CompletableFuture<>();
                try {
-                       P response = objectMapper.treeToValue(rawResponse, 
responseClass);
+                       P response = 
objectMapper.treeToValue(rawResponse.getJson(), responseClass);
                        responseFuture.complete(response);
                } catch (JsonProcessingException jpe) {
                        // the received response did not matched the expected 
response type
 
                        // lets see if it is an ErrorResponse instead
                        try {
-                               ErrorResponseBody error = 
objectMapper.treeToValue(rawResponse, ErrorResponseBody.class);
-                               responseFuture.completeExceptionally(new 
RestClientException(error.errors.toString()));
+                               ErrorResponseBody error = 
objectMapper.treeToValue(rawResponse.getJson(), ErrorResponseBody.class);
+                               responseFuture.completeExceptionally(new 
RestClientException(error.errors.toString(), 
rawResponse.getHttpResponseStatus()));
                        } catch (JsonProcessingException jpe2) {
                                // if this fails it is either the expected type 
or response type was wrong, most likely caused
                                // by a client/search MessageHeaders mismatch
                                LOG.error("Received response was neither of the 
expected type ({}) nor an error. Response={}", responseClass, rawResponse, 
jpe2);
-                               responseFuture.completeExceptionally(new 
RestClientException("Response was neither of the expected type(" + 
responseClass + ") nor an error.", jpe2));
+                               responseFuture.completeExceptionally(
+                                       new RestClientException(
+                                               "Response was neither of the 
expected type(" + responseClass + ") nor an error.",
+                                               jpe2,
+                                               
rawResponse.getHttpResponseStatus()));
                        }
                }
                return responseFuture;
@@ -198,9 +207,9 @@ public class RestClient {
 
        private static class ClientHandler extends 
SimpleChannelInboundHandler<Object> {
 
-               private final CompletableFuture<JsonNode> jsonFuture = new 
CompletableFuture<>();
+               private final CompletableFuture<JsonResponse> jsonFuture = new 
CompletableFuture<>();
 
-               CompletableFuture<JsonNode> getJsonFuture() {
+               CompletableFuture<JsonResponse> getJsonFuture() {
                        return jsonFuture;
                }
 
@@ -210,7 +219,18 @@ public class RestClient {
                                readRawResponse((FullHttpResponse) msg);
                        } else {
                                LOG.error("Implementation error: Received a 
response that wasn't a FullHttpResponse.");
-                               jsonFuture.completeExceptionally(new 
RestClientException("Implementation error: Received a response that wasn't a 
FullHttpResponse."));
+                               if (msg instanceof HttpResponse) {
+                                       jsonFuture.completeExceptionally(
+                                               new RestClientException(
+                                                       "Implementation error: 
Received a response that wasn't a FullHttpResponse.",
+                                                       ((HttpResponse) 
msg).getStatus()));
+                               } else {
+                                       jsonFuture.completeExceptionally(
+                                               new RestClientException(
+                                                       "Implementation error: 
Received a response that wasn't a FullHttpResponse.",
+                                                       
HttpResponseStatus.INTERNAL_SERVER_ERROR));
+                               }
+
                        }
                        ctx.close();
                }
@@ -225,14 +245,32 @@ public class RestClient {
                                LOG.debug("Received response {}.", rawResponse);
                        } catch (JsonParseException je) {
                                LOG.error("Response was not valid JSON.", je);
-                               jsonFuture.completeExceptionally(new 
RestClientException("Response was not valid JSON.", je));
+                               jsonFuture.completeExceptionally(new 
RestClientException("Response was not valid JSON.", je, msg.getStatus()));
                                return;
                        } catch (IOException ioe) {
                                LOG.error("Response could not be read.", ioe);
-                               jsonFuture.completeExceptionally(new 
RestClientException("Response could not be read.", ioe));
+                               jsonFuture.completeExceptionally(new 
RestClientException("Response could not be read.", ioe, msg.getStatus()));
                                return;
                        }
-                       jsonFuture.complete(rawResponse);
+                       jsonFuture.complete(new JsonResponse(rawResponse, 
msg.getStatus()));
+               }
+       }
+
+       private static final class JsonResponse {
+               private final JsonNode json;
+               private final HttpResponseStatus httpResponseStatus;
+
+               private JsonResponse(JsonNode json, HttpResponseStatus 
httpResponseStatus) {
+                       this.json = Preconditions.checkNotNull(json);
+                       this.httpResponseStatus = 
Preconditions.checkNotNull(httpResponseStatus);
+               }
+
+               public JsonNode getJson() {
+                       return json;
+               }
+
+               public HttpResponseStatus getHttpResponseStatus() {
+                       return httpResponseStatus;
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 23e2918..2f2f9aa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -135,7 +135,7 @@ public abstract class AbstractRestHandler<R extends 
RequestBody, P extends Respo
                                if (error != null) {
                                        if (error instanceof 
RestHandlerException) {
                                                RestHandlerException rhe = 
(RestHandlerException) error;
-                                               sendErrorResponse(new 
ErrorResponseBody(rhe.getErrorMessage()), rhe.getErrorCode(), ctx, httpRequest);
+                                               sendErrorResponse(new 
ErrorResponseBody(rhe.getErrorMessage()), rhe.getHttpResponseStatus(), ctx, 
httpRequest);
                                        } else {
                                                log.error("Implementation 
error: Unhandled exception.", error);
                                                sendErrorResponse(new 
ErrorResponseBody("Internal server error."), 
HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);

http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
index 9285f25..4cbb542 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
@@ -27,18 +27,18 @@ public class RestHandlerException extends Exception {
        private static final long serialVersionUID = -1358206297964070876L;
 
        private final String errorMessage;
-       private final HttpResponseStatus errorCode;
+       private final int responseCode;
 
-       public RestHandlerException(String errorMessage, HttpResponseStatus 
errorCode) {
+       public RestHandlerException(String errorMessage, HttpResponseStatus 
httpResponseStatus) {
                this.errorMessage = errorMessage;
-               this.errorCode = errorCode;
+               this.responseCode = httpResponseStatus.code();
        }
 
        public String getErrorMessage() {
                return errorMessage;
        }
 
-       public HttpResponseStatus getErrorCode() {
-               return errorCode;
+       public HttpResponseStatus getHttpResponseStatus() {
+               return HttpResponseStatus.valueOf(responseCode);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
index 9d86b47..2333614 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
@@ -19,6 +19,9 @@
 package org.apache.flink.runtime.rest.util;
 
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /**
  * An exception that is thrown if the failure of a REST operation was detected 
on the client.
@@ -27,15 +30,22 @@ public class RestClientException extends FlinkException {
 
        private static final long serialVersionUID = 937914622022344423L;
 
-       public RestClientException(String message) {
+       private final int responseCode;
+
+       public RestClientException(String message, HttpResponseStatus 
responseStatus) {
                super(message);
-       }
 
-       public RestClientException(Throwable cause) {
-               super(cause);
+               Preconditions.checkNotNull(responseStatus);
+               responseCode = responseStatus.code();
        }
 
-       public RestClientException(String message, Throwable cause) {
+       public RestClientException(String message, Throwable cause, 
HttpResponseStatus responseStatus) {
                super(message, cause);
+
+               responseCode = responseStatus.code();
+       }
+
+       public HttpResponseStatus getHttpResponseStatus() {
+               return HttpResponseStatus.valueOf(responseCode);
        }
 }

Reply via email to