Repository: flink
Updated Branches:
  refs/heads/release-1.5 9551c6fa9 -> 23a0917cc


[FLINK-7521][flip6] Return HTTP 413 if request limit is exceeded.

Remove unnecessary PipelineErrorHandler from RestClient.
Rename config keys for configuring request and response limits.
Set response headers for all error responses.

This closes #5685.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23a0917c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23a0917c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23a0917c

Branch: refs/heads/release-1.5
Commit: 23a0917ccd44689ee06379e7a402149940b0e79c
Parents: 8169ff5
Author: gyao <[email protected]>
Authored: Mon Mar 12 23:16:25 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Mar 13 19:00:32 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/configuration/RestOptions.java |  18 +--
 .../dispatcher/DispatcherRestEndpoint.java      |   2 -
 .../runtime/rest/FlinkHttpObjectAggregator.java |  67 +++++++++++
 .../apache/flink/runtime/rest/RestClient.java   |  16 ++-
 .../runtime/rest/RestClientConfiguration.java   |   8 +-
 .../flink/runtime/rest/RestServerEndpoint.java  |  10 +-
 .../rest/RestServerEndpointConfiguration.java   |  34 +++++-
 .../rest/handler/PipelineErrorHandler.java      |  16 ++-
 .../rest/handler/RestHandlerConfiguration.java  |  22 +---
 .../runtime/rest/handler/RouterHandler.java     |  13 ++-
 .../runtime/rest/handler/util/HandlerUtils.java |  56 ++++++++-
 .../runtime/webmonitor/WebMonitorEndpoint.java  |   2 -
 .../runtime/rest/RestServerEndpointITCase.java  | 113 ++++++++++++++++---
 ...SubtaskCurrentAttemptDetailsHandlerTest.java |   2 +-
 ...ExecutionAttemptAccumulatorsHandlerTest.java |   3 +-
 ...btaskExecutionAttemptDetailsHandlerTest.java |   2 +-
 16 files changed, 304 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 61bb085..94d7977 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -83,17 +83,19 @@ public class RestOptions {
                        .withDescription("The maximum time in ms for the client 
to establish a TCP connection.");
 
        /**
-        * The max content length that the server will handle.
+        * The maximum content length that the server will handle.
         */
-       public static final ConfigOption<Integer> REST_SERVER_CONTENT_MAX_MB =
-               key("rest.server.content.max.mb")
-                       .defaultValue(10);
+       public static final ConfigOption<Integer> 
REST_SERVER_MAX_CONTENT_LENGTH =
+               key("rest.server.max-content-length")
+                       .defaultValue(104_857_600)
+                       .withDescription("The maximum content length in bytes 
that the server will handle.");
 
        /**
-        * The max content length that the client will handle.
+        * The maximum content length that the client will handle.
         */
-       public static final ConfigOption<Integer> REST_CLIENT_CONTENT_MAX_MB =
-               key("rest.client.content.max.mb")
-                       .defaultValue(1);
+       public static final ConfigOption<Integer> 
REST_CLIENT_MAX_CONTENT_LENGTH =
+               key("rest.client.max-content-length")
+                       .defaultValue(104_857_600)
+                       .withDescription("The maximum content length in bytes 
that the client will handle.");
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 9df6dee..4518552 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -44,7 +44,6 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -92,7 +91,6 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway
                // Add the Dispatcher specific handlers
 
                final Time timeout = restConfiguration.getTimeout();
-               final Map<String, String> responseHeaders = 
restConfiguration.getResponseHeaders();
 
                BlobServerPortHandler blobServerPortHandler = new 
BlobServerPortHandler(
                        restAddressFuture,

http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
new file mode 100644
index 0000000..4ee0256
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Same as {@link 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectDecoder}
+ * but returns HTTP 413 to the client if the payload exceeds {@link 
#maxContentLength}.
+ */
+public class FlinkHttpObjectAggregator extends 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator 
{
+
+       private final Map<String, String> responseHeaders;
+
+       public FlinkHttpObjectAggregator(final int maxContentLength, @Nonnull 
final Map<String, String> responseHeaders) {
+               super(maxContentLength);
+               this.responseHeaders = responseHeaders;
+       }
+
+       @Override
+       protected void decode(
+                       final ChannelHandlerContext ctx,
+                       final HttpObject msg,
+                       final List<Object> out) throws Exception {
+
+               try {
+                       super.decode(ctx, msg, out);
+               } catch (final TooLongFrameException e) {
+                       HandlerUtils.sendErrorResponse(
+                               ctx,
+                               false,
+                               new ErrorResponseBody(String.format(
+                                       e.getMessage() + " Try to raise [%s]",
+                                       
RestOptions.REST_SERVER_MAX_CONTENT_LENGTH.key())),
+                               HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE,
+                               responseHeaders);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 801119d..6319634 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.rest;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
@@ -50,6 +50,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandl
 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
@@ -104,8 +105,7 @@ public class RestClient {
                                socketChannel.pipeline()
                                        .addLast(new HttpClientCodec())
                                        .addLast(new 
HttpObjectAggregator(configuration.getMaxContentLength()))
-                                       .addLast(new ClientHandler())
-                                       .addLast(new PipelineErrorHandler(LOG));
+                                       .addLast(new ClientHandler());
                        }
                };
                NioEventLoopGroup group = new NioEventLoopGroup(1, new 
DefaultThreadFactory("flink-rest-client-netty"));
@@ -269,8 +269,14 @@ public class RestClient {
                }
 
                @Override
-               public void exceptionCaught(final ChannelHandlerContext ctx, 
final Throwable cause) throws Exception {
-                       jsonFuture.completeExceptionally(cause);
+               public void exceptionCaught(final ChannelHandlerContext ctx, 
final Throwable cause) {
+                       if (cause instanceof TooLongFrameException) {
+                               jsonFuture.completeExceptionally(new 
TooLongFrameException(String.format(
+                                       cause.getMessage() + " Try to raise 
[%s]",
+                                       
RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH.key())));
+                       } else {
+                               jsonFuture.completeExceptionally(cause);
+                       }
                        ctx.close();
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
index 782cb4e..17d4264 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -29,6 +29,8 @@ import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A configuration object for {@link RestClient}s.
  */
@@ -45,6 +47,7 @@ public final class RestClientConfiguration {
                        @Nullable final SSLEngine sslEngine,
                        final long connectionTimeout,
                        final int maxContentLength) {
+               checkArgument(maxContentLength > 0, "maxContentLength must be 
positive, was: %d", maxContentLength);
                this.sslEngine = sslEngine;
                this.connectionTimeout = connectionTimeout;
                this.maxContentLength = maxContentLength;
@@ -104,10 +107,7 @@ public final class RestClientConfiguration {
 
                final long connectionTimeout = 
config.getLong(RestOptions.CONNECTION_TIMEOUT);
 
-               int maxContentLength = 
config.getInteger(RestOptions.REST_CLIENT_CONTENT_MAX_MB) * 1024 * 1024;
-               if (maxContentLength <= 0) {
-                       throw new ConfigurationException("Max content length 
for client must be a positive integer: " + maxContentLength);
-               }
+               int maxContentLength = 
config.getInteger(RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH);
 
                return new RestClientConfiguration(sslEngine, 
connectionTimeout, maxContentLength);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 8b39250..a3d4843 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -37,7 +37,6 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
@@ -59,6 +58,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
@@ -78,6 +78,7 @@ public abstract class RestServerEndpoint {
        private final SSLEngine sslEngine;
        private final Path uploadDir;
        private final int maxContentLength;
+       protected final Map<String, String> responseHeaders;
 
        private final CompletableFuture<Void> terminationFuture;
 
@@ -97,6 +98,7 @@ public abstract class RestServerEndpoint {
                createUploadDir(uploadDir, log);
 
                this.maxContentLength = configuration.getMaxContentLength();
+               this.responseHeaders = configuration.getResponseHeaders();
 
                terminationFuture = new CompletableFuture<>();
 
@@ -148,7 +150,7 @@ public abstract class RestServerEndpoint {
 
                                @Override
                                protected void initChannel(SocketChannel ch) {
-                                       Handler handler = new 
RouterHandler(router);
+                                       Handler handler = new 
RouterHandler(router, responseHeaders);
 
                                        // SSL should be the first handler in 
the pipeline
                                        if (sslEngine != null) {
@@ -158,9 +160,9 @@ public abstract class RestServerEndpoint {
                                        ch.pipeline()
                                                .addLast(new HttpServerCodec())
                                                .addLast(new 
FileUploadHandler(uploadDir))
-                                               .addLast(new 
HttpObjectAggregator(maxContentLength))
+                                               .addLast(new 
FlinkHttpObjectAggregator(maxContentLength, responseHeaders))
                                                .addLast(handler.name(), 
handler)
-                                               .addLast(new 
PipelineErrorHandler(log));
+                                               .addLast(new 
PipelineErrorHandler(log, responseHeaders));
                                }
                        };
 

http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index 3685e2d..35bd6ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -26,12 +26,16 @@ import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 
 import static java.util.Objects.requireNonNull;
@@ -53,20 +57,24 @@ public final class RestServerEndpointConfiguration {
 
        private final int maxContentLength;
 
+       private final Map<String, String> responseHeaders;
+
        private RestServerEndpointConfiguration(
                        @Nullable String restBindAddress,
                        int restBindPort,
                        @Nullable SSLEngine sslEngine,
                        final Path uploadDir,
-                       final int maxContentLength) {
+                       final int maxContentLength, final Map<String, String> 
responseHeaders) {
 
                Preconditions.checkArgument(0 <= restBindPort && restBindPort < 
65536, "The bing rest port " + restBindPort + " is out of range (0, 65536[");
+               Preconditions.checkArgument(maxContentLength > 0, 
"maxContentLength must be positive, was: %d", maxContentLength);
 
                this.restBindAddress = restBindAddress;
                this.restBindPort = restBindPort;
                this.sslEngine = sslEngine;
                this.uploadDir = requireNonNull(uploadDir);
                this.maxContentLength = maxContentLength;
+               this.responseHeaders = 
requireNonNull(Collections.unmodifiableMap(responseHeaders));
        }
 
        /**
@@ -113,6 +121,13 @@ public final class RestServerEndpointConfiguration {
        }
 
        /**
+        * Response headers that should be added to every HTTP response.
+        */
+       public Map<String, String> getResponseHeaders() {
+               return responseHeaders;
+       }
+
+       /**
         * Creates and returns a new {@link RestServerEndpointConfiguration} 
from the given {@link Configuration}.
         *
         * @param config configuration from which the REST server endpoint 
configuration should be created from
@@ -144,11 +159,18 @@ public final class RestServerEndpointConfiguration {
                        config.getString(WebOptions.UPLOAD_DIR, 
config.getString(WebOptions.TMP_DIR)),
                        "flink-web-upload-" + UUID.randomUUID());
 
-               int maxContentLength = 
config.getInteger(RestOptions.REST_SERVER_CONTENT_MAX_MB) * 1024 * 1024;
-               if (maxContentLength <= 0) {
-                       throw new ConfigurationException("Max content length 
for server must be a positive integer: " + maxContentLength);
-               }
+               int maxContentLength = 
config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH);
+
+               final Map<String, String> responseHeaders = 
Collections.singletonMap(
+                       HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN,
+                       
config.getString(WebOptions.ACCESS_CONTROL_ALLOW_ORIGIN));
 
-               return new RestServerEndpointConfiguration(address, port, 
sslEngine, uploadDir, maxContentLength);
+               return new RestServerEndpointConfiguration(
+                       address,
+                       port,
+                       sslEngine,
+                       uploadDir,
+                       maxContentLength,
+                       responseHeaders);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
index 046118a..a16b01f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
@@ -30,6 +30,9 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 import org.slf4j.Logger;
 
 import java.util.Collections;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
 
 /**
  * This is the last handler in the pipeline. It logs all error messages.
@@ -40,8 +43,11 @@ public class PipelineErrorHandler extends 
SimpleChannelInboundHandler<HttpReques
        /** The logger to which the handler writes the log statements. */
        private final Logger logger;
 
-       public PipelineErrorHandler(Logger logger) {
-               this.logger = logger;
+       private final Map<String, String> responseHeaders;
+
+       public PipelineErrorHandler(Logger logger, final Map<String, String> 
responseHeaders) {
+               this.logger = requireNonNull(logger);
+               this.responseHeaders = requireNonNull(responseHeaders);
        }
 
        @Override
@@ -59,5 +65,11 @@ public class PipelineErrorHandler extends 
SimpleChannelInboundHandler<HttpReques
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
{
                logger.warn("Unhandled exception", cause);
+               HandlerUtils.sendErrorResponse(
+                       ctx,
+                       false,
+                       new ErrorResponseBody("Internal server error: " + 
cause.getMessage()),
+                       HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                       responseHeaders);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
index acdd63c..f92946b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -23,11 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-
 import java.io.File;
-import java.util.Collections;
-import java.util.Map;
 import java.util.UUID;
 
 /**
@@ -43,14 +39,11 @@ public class RestHandlerConfiguration {
 
        private final File tmpDir;
 
-       private final Map<String, String> responseHeaders;
-
        public RestHandlerConfiguration(
                        long refreshInterval,
                        int maxCheckpointStatisticCacheEntries,
                        Time timeout,
-                       File tmpDir,
-                       Map<String, String> responseHeaders) {
+                       File tmpDir) {
                Preconditions.checkArgument(refreshInterval > 0L, "The refresh 
interval (ms) should be larger than 0.");
                this.refreshInterval = refreshInterval;
 
@@ -58,8 +51,6 @@ public class RestHandlerConfiguration {
 
                this.timeout = Preconditions.checkNotNull(timeout);
                this.tmpDir = Preconditions.checkNotNull(tmpDir);
-
-               this.responseHeaders = 
Preconditions.checkNotNull(responseHeaders);
        }
 
        public long getRefreshInterval() {
@@ -78,10 +69,6 @@ public class RestHandlerConfiguration {
                return tmpDir;
        }
 
-       public Map<String, String> getResponseHeaders() {
-               return Collections.unmodifiableMap(responseHeaders);
-       }
-
        public static RestHandlerConfiguration fromConfiguration(Configuration 
configuration) {
                final long refreshInterval = 
configuration.getLong(WebOptions.REFRESH_INTERVAL);
 
@@ -92,15 +79,10 @@ public class RestHandlerConfiguration {
                final String rootDir = "flink-web-" + UUID.randomUUID();
                final File tmpDir = new 
File(configuration.getString(WebOptions.TMP_DIR), rootDir);
 
-               final Map<String, String> responseHeaders = 
Collections.singletonMap(
-                       HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN,
-                       
configuration.getString(WebOptions.ACCESS_CONTROL_ALLOW_ORIGIN));
-
                return new RestHandlerConfiguration(
                        refreshInterval,
                        maxCheckpointStatisticCacheEntries,
                        timeout,
-                       tmpDir,
-                       responseHeaders);
+                       tmpDir);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
index d1d0837..fc02250 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
@@ -27,20 +27,21 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Map;
 
-import java.util.Collections;
+import static java.util.Objects.requireNonNull;
 
 /**
  * This class is an extension of {@link Handler} that replaces the standard 
error response to be identical with those
  * sent by the {@link AbstractRestHandler}.
  */
 public class RouterHandler extends Handler {
-       private static final Logger LOG = 
LoggerFactory.getLogger(RouterHandler.class);
 
-       public RouterHandler(Router router) {
+       private final Map<String, String> responseHeaders;
+
+       public RouterHandler(Router router, final Map<String, String> 
responseHeaders) {
                super(router);
+               this.responseHeaders = requireNonNull(responseHeaders);
        }
 
        @Override
@@ -50,6 +51,6 @@ public class RouterHandler extends Handler {
                        request,
                        new ErrorResponseBody("Not found."),
                        HttpResponseStatus.NOT_FOUND,
-                       Collections.emptyMap());
+                       responseHeaders);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
index a69f4aa..b407ada 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
@@ -112,6 +112,30 @@ public class HandlerUtils {
                        HttpResponseStatus statusCode,
                        Map<String, String> headers) {
 
+               sendErrorResponse(
+                       channelHandlerContext,
+                       HttpHeaders.isKeepAlive(httpRequest),
+                       errorMessage,
+                       statusCode,
+                       headers);
+       }
+
+       /**
+        * Sends the given error response and status code to the given channel.
+        *
+        * @param channelHandlerContext identifying the open channel
+        * @param keepAlive If the connection should be kept alive.
+        * @param errorMessage which should be sent
+        * @param statusCode of the message to send
+        * @param headers additional header values
+        */
+       public static void sendErrorResponse(
+                       ChannelHandlerContext channelHandlerContext,
+                       boolean keepAlive,
+                       ErrorResponseBody errorMessage,
+                       HttpResponseStatus statusCode,
+                       Map<String, String> headers) {
+
                StringWriter sw = new StringWriter();
                try {
                        mapper.writeValue(sw, errorMessage);
@@ -120,14 +144,14 @@ public class HandlerUtils {
                        LOG.error("Internal server error. Could not map error 
response to JSON.", e);
                        sendResponse(
                                channelHandlerContext,
-                               httpRequest,
+                               keepAlive,
                                "Internal server error. Could not map error 
response to JSON.",
                                HttpResponseStatus.INTERNAL_SERVER_ERROR,
                                headers);
                }
                sendResponse(
                        channelHandlerContext,
-                       httpRequest,
+                       keepAlive,
                        sw.toString(),
                        statusCode,
                        headers);
@@ -148,6 +172,30 @@ public class HandlerUtils {
                        @Nonnull String message,
                        @Nonnull HttpResponseStatus statusCode,
                        @Nonnull Map<String, String> headers) {
+
+               sendResponse(
+                       channelHandlerContext,
+                       HttpHeaders.isKeepAlive(httpRequest),
+                       message,
+                       statusCode,
+                       headers);
+       }
+
+       /**
+        * Sends the given response and status code to the given channel.
+        *
+        * @param channelHandlerContext identifying the open channel
+        * @param keepAlive If the connection should be kept alive.
+        * @param message which should be sent
+        * @param statusCode of the message to send
+        * @param headers additional header values
+        */
+       public static void sendResponse(
+                       @Nonnull ChannelHandlerContext channelHandlerContext,
+                       boolean keepAlive,
+                       @Nonnull String message,
+                       @Nonnull HttpResponseStatus statusCode,
+                       @Nonnull Map<String, String> headers) {
                HttpResponse response = new DefaultHttpResponse(HTTP_1_1, 
statusCode);
 
                response.headers().set(CONTENT_TYPE, 
RestConstants.REST_CONTENT_TYPE);
@@ -156,7 +204,7 @@ public class HandlerUtils {
                        response.headers().set(headerEntry.getKey(), 
headerEntry.getValue());
                }
 
-               if (HttpHeaders.isKeepAlive(httpRequest)) {
+               if (keepAlive) {
                        response.headers().set(CONNECTION, 
HttpHeaders.Values.KEEP_ALIVE);
                }
 
@@ -172,7 +220,7 @@ public class HandlerUtils {
                ChannelFuture lastContentFuture = 
channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
 
                // close the connection, if no keep-alive is needed
-               if (!HttpHeaders.isKeepAlive(httpRequest)) {
+               if (!keepAlive) {
                        
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 10a3650..dfb2fc8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -129,7 +129,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -199,7 +198,6 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                ArrayList<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers = new ArrayList<>(30);
 
                final Time timeout = restConfiguration.getTimeout();
-               final Map<String, String> responseHeaders = 
restConfiguration.getResponseHeaders();
 
                ClusterOverviewHandler clusterOverviewHandler = new 
ClusterOverviewHandler(
                        restAddressFuture,

http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index c9817ff..32f3ec8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -51,10 +51,10 @@ import org.apache.flink.util.TestLogger;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -81,7 +81,12 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -96,6 +101,7 @@ public class RestServerEndpointITCase extends TestLogger {
        private static final JobID QUERY_JOB_ID = new JobID();
        private static final String JOB_ID_KEY = "jobid";
        private static final Time timeout = Time.seconds(10L);
+       private static final int TEST_REST_MAX_CONTENT_LENGTH = 4096;
 
        @Rule
        public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -103,12 +109,15 @@ public class RestServerEndpointITCase extends TestLogger {
        private RestServerEndpoint serverEndpoint;
        private RestClient restClient;
        private TestUploadHandler testUploadHandler;
+       private InetSocketAddress serverAddress;
 
        @Before
        public void setup() throws Exception {
                Configuration config = new Configuration();
                config.setInteger(RestOptions.REST_PORT, 0);
                config.setString(WebOptions.UPLOAD_DIR, 
temporaryFolder.newFolder().getCanonicalPath());
+               config.setInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH, 
TEST_REST_MAX_CONTENT_LENGTH);
+               config.setInteger(RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH, 
TEST_REST_MAX_CONTENT_LENGTH);
 
                RestServerEndpointConfiguration serverConfig = 
RestServerEndpointConfiguration.fromConfiguration(config);
                RestClientConfiguration clientConfig = 
RestClientConfiguration.fromConfiguration(config);
@@ -133,6 +142,7 @@ public class RestServerEndpointITCase extends TestLogger {
                restClient = new TestRestClient(clientConfig);
 
                serverEndpoint.start();
+               serverAddress = serverEndpoint.getServerAddress();
        }
 
        @After
@@ -161,7 +171,6 @@ public class RestServerEndpointITCase extends TestLogger {
 
                // send first request and wait until the handler blocks
                CompletableFuture<TestResponse> response1;
-               final InetSocketAddress serverAddress = 
serverEndpoint.getServerAddress();
 
                synchronized (TestHandler.LOCK) {
                        response1 = restClient.sendRequest(
@@ -198,8 +207,6 @@ public class RestServerEndpointITCase extends TestLogger {
         */
        @Test
        public void testBadHandlerRequest() throws Exception {
-               final InetSocketAddress serverAddress = 
serverEndpoint.getServerAddress();
-
                final FaultyTestParameters parameters = new 
FaultyTestParameters();
 
                parameters.faultyJobIDPathParameter.resolve(PATH_JOB_ID);
@@ -215,11 +222,11 @@ public class RestServerEndpointITCase extends TestLogger {
                try {
                        response.get();
 
-                       Assert.fail("The request should fail with a bad request 
return code.");
+                       fail("The request should fail with a bad request return 
code.");
                } catch (ExecutionException ee) {
                        Throwable t = 
ExceptionUtils.stripExecutionException(ee);
 
-                       Assert.assertTrue(t instanceof RestClientException);
+                       assertTrue(t instanceof RestClientException);
 
                        RestClientException rce = (RestClientException) t;
 
@@ -228,6 +235,50 @@ public class RestServerEndpointITCase extends TestLogger {
        }
 
        /**
+        * Tests that requests and responses larger than {@link 
#TEST_REST_MAX_CONTENT_LENGTH}
+        * are rejected by the server and client, respectively.
+        */
+       @Test
+       public void testMaxContentLengthLimit() throws Exception {
+               final TestParameters parameters = new TestParameters();
+               parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
+               
parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
+
+               CompletableFuture<TestResponse> response;
+               response = restClient.sendRequest(
+                       serverAddress.getHostName(),
+                       serverAddress.getPort(),
+                       new TestHeaders(),
+                       parameters,
+                       new TestRequest(2, 
createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
+
+               try {
+                       response.get();
+                       fail("Expected exception not thrown");
+               } catch (final ExecutionException e) {
+                       final Throwable throwable = 
ExceptionUtils.stripExecutionException(e);
+                       assertThat(throwable, 
instanceOf(RestClientException.class));
+                       assertThat(throwable.getMessage(), containsString("Try 
to raise"));
+               }
+
+               response = restClient.sendRequest(
+                       serverAddress.getHostName(),
+                       serverAddress.getPort(),
+                       new TestHeaders(),
+                       parameters,
+                       new TestRequest(TestHandler.LARGE_RESPONSE_BODY_ID));
+
+               try {
+                       response.get();
+                       fail("Expected exception not thrown");
+               } catch (final ExecutionException e) {
+                       final Throwable throwable = 
ExceptionUtils.stripExecutionException(e);
+                       assertThat(throwable, 
instanceOf(TooLongFrameException.class));
+                       assertThat(throwable.getMessage(), containsString("Try 
to raise"));
+               }
+       }
+
+       /**
         * Tests that multipart/form-data uploads work correctly.
         *
         * @see FileUploadHandler
@@ -294,6 +345,14 @@ public class RestServerEndpointITCase extends TestLogger {
                return Long.toHexString(System.currentTimeMillis());
        }
 
+       private static String createStringOfSize(int size) {
+               StringBuilder sb = new StringBuilder(size);
+               for (int i = 0; i < size; i++) {
+                       sb.append('a');
+               }
+               return sb.toString();
+       }
+
        private static class TestRestServerEndpoint extends RestServerEndpoint {
 
                private final TestHandler testHandler;
@@ -323,12 +382,14 @@ public class RestServerEndpointITCase extends TestLogger {
 
        private static class TestHandler extends 
AbstractRestHandler<RestfulGateway, TestRequest, TestResponse, TestParameters> {
 
-               public static final Object LOCK = new Object();
+               private static final Object LOCK = new Object();
+
+               private static final int LARGE_RESPONSE_BODY_ID = 3;
 
                TestHandler(
-                       CompletableFuture<String> localAddressFuture,
-                       GatewayRetriever<RestfulGateway> leaderRetriever,
-                       Time timeout) {
+                               CompletableFuture<String> localAddressFuture,
+                               GatewayRetriever<RestfulGateway> 
leaderRetriever,
+                               Time timeout) {
                        super(
                                localAddressFuture,
                                leaderRetriever,
@@ -342,7 +403,8 @@ public class RestServerEndpointITCase extends TestLogger {
                        
assertEquals(request.getPathParameter(JobIDPathParameter.class), PATH_JOB_ID);
                        
assertEquals(request.getQueryParameter(JobIDQueryParameter.class).get(0), 
QUERY_JOB_ID);
 
-                       if (request.getRequestBody().id == 1) {
+                       final int id = request.getRequestBody().id;
+                       if (id == 1) {
                                synchronized (LOCK) {
                                        try {
                                                LOCK.notifyAll();
@@ -350,8 +412,12 @@ public class RestServerEndpointITCase extends TestLogger {
                                        } catch (InterruptedException ignored) {
                                        }
                                }
+                       } else if (id == LARGE_RESPONSE_BODY_ID) {
+                               return CompletableFuture.completedFuture(new 
TestResponse(
+                                       id,
+                                       
createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
                        }
-                       return CompletableFuture.completedFuture(new 
TestResponse(request.getRequestBody().id));
+                       return CompletableFuture.completedFuture(new 
TestResponse(id));
                }
        }
 
@@ -365,18 +431,37 @@ public class RestServerEndpointITCase extends TestLogger {
        private static class TestRequest implements RequestBody {
                public final int id;
 
+               public final String content;
+
+               public TestRequest(int id) {
+                       this(id, null);
+               }
+
                @JsonCreator
-               public TestRequest(@JsonProperty("id") int id) {
+               public TestRequest(
+                               @JsonProperty("id") int id,
+                               @JsonProperty("content") final String content) {
                        this.id = id;
+                       this.content = content;
                }
        }
 
        private static class TestResponse implements ResponseBody {
+
                public final int id;
 
+               public final String content;
+
+               public TestResponse(int id) {
+                       this(id, null);
+               }
+
                @JsonCreator
-               public TestResponse(@JsonProperty("id") int id) {
+               public TestResponse(
+                               @JsonProperty("id") int id,
+                               @JsonProperty("content") String content) {
                        this.id = id;
+                       this.content = content;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 997601f..af8b995 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -126,7 +126,7 @@ public class SubtaskCurrentAttemptDetailsHandlerTest 
extends TestLogger {
                        CompletableFuture.completedFuture("127.0.0.1:9527"),
                        () -> null,
                        Time.milliseconds(100),
-                       restHandlerConfiguration.getResponseHeaders(),
+                       Collections.emptyMap(),
                        SubtaskCurrentAttemptDetailsHeaders.getInstance(),
                        new ExecutionGraphCache(
                                restHandlerConfiguration.getTimeout(),

http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index 5f03c55..318541d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -62,7 +63,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest 
extends TestLogger {
                        CompletableFuture.completedFuture("127.0.0.1:9527"),
                        () -> null,
                        Time.milliseconds(100L),
-                       restHandlerConfiguration.getResponseHeaders(),
+                       Collections.emptyMap(),
                        
SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(),
                        new ExecutionGraphCache(
                                restHandlerConfiguration.getTimeout(),

http://git-wip-us.apache.org/repos/asf/flink/blob/23a0917c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index d55ab77..8e44c0e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -129,7 +129,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest 
extends TestLogger {
                        CompletableFuture.completedFuture("127.0.0.1:9527"),
                        () -> null,
                        Time.milliseconds(100L),
-                       restHandlerConfiguration.getResponseHeaders(),
+                       Collections.emptyMap(),
                        SubtaskExecutionAttemptDetailsHeaders.getInstance(),
                        new ExecutionGraphCache(
                                restHandlerConfiguration.getTimeout(),

Reply via email to