This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 646ff2d  [FLINK-21108][web] Add custom netty HTTP request 
inbound/outbound handlers
646ff2d is described below

commit 646ff2d36f40704f5dca017b8fffed78bd51b307
Author: Gabor Somogyi <[email protected]>
AuthorDate: Wed Jun 23 17:08:44 2021 +0200

    [FLINK-21108][web] Add custom netty HTTP request inbound/outbound handlers
    
    Closes #16463
---
 .../client/program/rest/RestClusterClient.java     |   5 +-
 .../RestClusterClientSavepointTriggerTest.java     |  10 +-
 .../client/program/rest/RestClusterClientTest.java |  11 +-
 .../flink/docs/rest/RestAPIDocGenerator.java       |   3 +-
 .../util/flink/LocalStandaloneFlinkResource.java   |   5 +-
 .../metrics/tests/MetricsAvailabilityITCase.java   |   5 +-
 .../webmonitor/utils/WebFrontendBootstrap.java     |  48 +++++-
 .../rest/compatibility/RestAPIStabilityTest.java   |   3 +-
 .../webmonitor/history/HistoryServerTest.java      |   2 +-
 .../webmonitor/utils/WebFrontendBootstrapTest.java |  87 ++++++++++
 .../runtime/dispatcher/DispatcherRestEndpoint.java |   6 +-
 .../netty/InboundChannelHandlerFactory.java        |  57 ++++++
 .../netty/OutboundChannelHandlerFactory.java       |  55 ++++++
 .../jobmaster/MiniDispatcherRestEndpoint.java      |   6 +-
 .../flink/runtime/rest/JobRestEndpointFactory.java |   1 -
 .../org/apache/flink/runtime/rest/RestClient.java  |  60 ++++++-
 .../flink/runtime/rest/RestServerEndpoint.java     |  64 +++++--
 .../runtime/rest/SessionRestEndpointFactory.java   |   1 -
 .../runtime/rest/handler/router/RouterHandler.java |   2 +-
 .../runtime/webmonitor/WebMonitorEndpoint.java     |   7 +-
 .../netty/Prio0InboundChannelHandlerFactory.java   |  70 ++++++++
 .../netty/Prio1InboundChannelHandlerFactory.java   |  42 +++++
 .../runtime/rest/MultipartUploadResource.java      |   5 +-
 .../rest/Prio0OutboundChannelHandlerFactory.java   |  66 +++++++
 .../rest/Prio1OutboundChannelHandlerFactory.java   |  41 +++++
 .../runtime/rest/RestClientMultipartTest.java      |   5 +-
 .../apache/flink/runtime/rest/RestClientTest.java  |  17 +-
 .../runtime/rest/RestExternalHandlersITCase.java   | 192 +++++++++++++++++++++
 .../runtime/rest/RestServerEndpointITCase.java     |  29 +---
 .../runtime/rest/RestServerSSLAuthITCase.java      |   9 +-
 .../rest/handler/AbstractHandlerITCase.java        |   9 +-
 .../util/DocumentingDispatcherRestEndpoint.java    |  12 +-
 .../runtime/rest/util/TestRestServerEndpoint.java  |  15 +-
 .../runtime/webmonitor/WebMonitorEndpointTest.java |   2 -
 ...e.io.network.netty.InboundChannelHandlerFactory |  17 ++
 ....io.network.netty.OutboundChannelHandlerFactory |  17 ++
 .../flink/test/checkpointing/SavepointITCase.java  |   4 +-
 .../recovery/BatchFineGrainedRecoveryITCase.java   |   5 +-
 .../yarn/YARNSessionCapacitySchedulerITCase.java   |   6 +-
 .../apache/flink/yarn/YarnConfigurationITCase.java |   5 +-
 40 files changed, 847 insertions(+), 159 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 60a02ac..68418ef 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -207,10 +207,7 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
         if (restClient != null) {
             this.restClient = restClient;
         } else {
-            this.restClient =
-                    new RestClient(
-                            
restClusterClientConfiguration.getRestClientConfiguration(),
-                            executorService);
+            this.restClient = new RestClient(configuration, executorService);
         }
 
         this.waitStrategy = checkNotNull(waitStrategy);
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java
index 65345b3..946c8dc 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java
@@ -26,9 +26,7 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -94,8 +92,6 @@ public class RestClusterClientSavepointTriggerTest extends 
TestLogger {
     private static final GatewayRetriever<DispatcherGateway> 
mockGatewayRetriever =
             () -> CompletableFuture.completedFuture(mockRestfulGateway);
 
-    private static RestServerEndpointConfiguration 
restServerEndpointConfiguration;
-
     private static ExecutorService executor;
 
     private static final Configuration REST_CONFIG;
@@ -112,8 +108,6 @@ public class RestClusterClientSavepointTriggerTest extends 
TestLogger {
 
     @BeforeClass
     public static void setUp() throws ConfigurationException {
-        restServerEndpointConfiguration =
-                RestServerEndpointConfiguration.fromConfiguration(REST_CONFIG);
         executor =
                 Executors.newSingleThreadExecutor(
                         new ExecutorThreadFactory(
@@ -273,7 +267,7 @@ public class RestClusterClientSavepointTriggerTest extends 
TestLogger {
             final FunctionWithException<TriggerId, SavepointInfo, 
RestHandlerException>
                     savepointHandlerLogic)
             throws Exception {
-        return TestRestServerEndpoint.builder(restServerEndpointConfiguration)
+        return TestRestServerEndpoint.builder(REST_CONFIG)
                 .withHandler(new 
TestSavepointTriggerHandler(triggerHandlerLogic))
                 .withHandler(new TestSavepointHandler(savepointHandlerLogic))
                 .buildAndStart();
@@ -355,7 +349,7 @@ public class RestClusterClientSavepointTriggerTest extends 
TestLogger {
         clientConfig.setInteger(RestOptions.PORT, port);
         return new RestClusterClient<>(
                 clientConfig,
-                new 
RestClient(RestClientConfiguration.fromConfiguration(REST_CONFIG), executor),
+                new RestClient(REST_CONFIG, executor),
                 StandaloneClusterId.getInstance(),
                 (attempt) -> 0);
     }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 0cf2f9e..9c9a6ef 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -45,8 +45,6 @@ import 
org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.rest.FileUpload;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -154,8 +152,6 @@ public class RestClusterClientTest extends TestLogger {
 
     private GatewayRetriever<DispatcherGateway> mockGatewayRetriever;
 
-    private RestServerEndpointConfiguration restServerEndpointConfiguration;
-
     private volatile FailHttpRequestPredicate failHttpRequest = 
FailHttpRequestPredicate.never();
 
     private ExecutorService executor;
@@ -177,8 +173,6 @@ public class RestClusterClientTest extends TestLogger {
 
     @Before
     public void setUp() throws Exception {
-        restServerEndpointConfiguration =
-                RestServerEndpointConfiguration.fromConfiguration(restConfig);
         mockGatewayRetriever = () -> 
CompletableFuture.completedFuture(mockRestfulGateway);
 
         executor =
@@ -209,7 +203,7 @@ public class RestClusterClientTest extends TestLogger {
 
     @Nonnull
     private RestClient createRestClient() throws ConfigurationException {
-        return new 
RestClient(RestClientConfiguration.fromConfiguration(restConfig), executor) {
+        return new RestClient(restConfig, executor) {
             @Override
             public <
                             M extends MessageHeaders<R, P, U>,
@@ -1048,8 +1042,7 @@ public class RestClusterClientTest extends TestLogger {
 
     private TestRestServerEndpoint createRestServerEndpoint(
             final AbstractRestHandler<?, ?, ?, ?>... abstractRestHandlers) 
throws Exception {
-        TestRestServerEndpoint.Builder builder =
-                
TestRestServerEndpoint.builder(restServerEndpointConfiguration);
+        TestRestServerEndpoint.Builder builder = 
TestRestServerEndpoint.builder(restConfig);
         Arrays.stream(abstractRestHandlers).forEach(builder::withHandler);
 
         return builder.buildAndStart();
diff --git 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java 
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index b16f1eb..e865eed 100644
--- 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ 
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
 import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.util.ConfigurationException;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.SerializableString;
@@ -99,7 +100,7 @@ public class RestAPIDocGenerator {
      * @param args args[0] contains the directory into which the generated 
files are placed
      * @throws IOException if any file operation failed
      */
-    public static void main(String[] args) throws IOException {
+    public static void main(String[] args) throws IOException, 
ConfigurationException {
         String outputDirectory = args[0];
 
         for (final RestAPIVersion apiVersion : RestAPIVersion.values()) {
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
index 27c11fd..429e4f5 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
@@ -21,7 +21,6 @@ package org.apache.flink.tests.util.flink;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
@@ -133,9 +132,7 @@ public class LocalStandaloneFlinkResource implements 
FlinkResource {
         distribution.startFlinkCluster();
 
         try (final RestClient restClient =
-                new RestClient(
-                        RestClientConfiguration.fromConfiguration(new 
Configuration()),
-                        Executors.directExecutor())) {
+                new RestClient(new Configuration(), 
Executors.directExecutor())) {
             for (int retryAttempt = 0; retryAttempt < 30; retryAttempt++) {
                 final CompletableFuture<TaskManagersInfo> localhost =
                         restClient.sendRequest(
diff --git 
a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
 
b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
index 7955110..cf42948 100644
--- 
a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
+++ 
b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
@@ -94,9 +93,7 @@ public class MetricsAvailabilityITCase extends TestLogger {
     public void testReporter() throws Exception {
         try (ClusterController ignored = dist.startCluster(1)) {
             final RestClient restClient =
-                    new RestClient(
-                            RestClientConfiguration.fromConfiguration(new 
Configuration()),
-                            scheduledExecutorService);
+                    new RestClient(new Configuration(), 
scheduledExecutorService);
 
             checkJobManagerMetricAvailability(restClient);
 
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
index d6fc7e4..543a48b 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
@@ -18,18 +18,22 @@
 
 package org.apache.flink.runtime.webmonitor.utils;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory;
 import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
 import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.rest.handler.router.RouterHandler;
 import org.apache.flink.runtime.webmonitor.HttpRequestHandler;
 import org.apache.flink.runtime.webmonitor.PipelineErrorHandler;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
@@ -45,7 +49,14 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceLoader;
 
 /** This classes encapsulates the boot-strapping of netty for the 
web-frontend. */
 public class WebFrontendBootstrap {
@@ -55,6 +66,8 @@ public class WebFrontendBootstrap {
     private final ServerBootstrap bootstrap;
     private final Channel serverChannel;
     private final String restAddress;
+    private final Map<String, String> responseHeaders;
+    @VisibleForTesting List<InboundChannelHandlerFactory> 
inboundChannelHandlerFactories;
 
     public WebFrontendBootstrap(
             Router router,
@@ -69,15 +82,34 @@ public class WebFrontendBootstrap {
         this.router = Preconditions.checkNotNull(router);
         this.log = Preconditions.checkNotNull(log);
         this.uploadDir = directory;
+        this.responseHeaders = new HashMap<>();
+        inboundChannelHandlerFactories = new ArrayList<>();
+        ServiceLoader<InboundChannelHandlerFactory> loader =
+                ServiceLoader.load(InboundChannelHandlerFactory.class);
+        final Iterator<InboundChannelHandlerFactory> factories = 
loader.iterator();
+        while (factories.hasNext()) {
+            try {
+                final InboundChannelHandlerFactory factory = factories.next();
+                if (factory != null) {
+                    inboundChannelHandlerFactories.add(factory);
+                    log.info("Loaded channel inbound factory: {}", factory);
+                }
+            } catch (Throwable e) {
+                log.error("Could not load channel inbound factory.", e);
+                throw e;
+            }
+        }
+        inboundChannelHandlerFactories.sort(
+                
Comparator.comparingInt(InboundChannelHandlerFactory::priority).reversed());
 
         ChannelInitializer<SocketChannel> initializer =
                 new ChannelInitializer<SocketChannel>() {
 
                     @Override
-                    protected void initChannel(SocketChannel ch) {
+                    protected void initChannel(SocketChannel ch) throws 
ConfigurationException {
                         RouterHandler handler =
                                 new RouterHandler(
-                                        WebFrontendBootstrap.this.router, new 
HashMap<>());
+                                        WebFrontendBootstrap.this.router, 
responseHeaders);
 
                         // SSL should be the first handler in the pipeline
                         if (serverSSLFactory != null) {
@@ -87,8 +119,18 @@ public class WebFrontendBootstrap {
                                             
serverSSLFactory.createNettySSLHandler(ch.alloc()));
                         }
 
+                        ch.pipeline().addLast(new HttpServerCodec());
+
+                        for (InboundChannelHandlerFactory factory :
+                                inboundChannelHandlerFactories) {
+                            Optional<ChannelHandler> channelHandler =
+                                    factory.createHandler(config, 
responseHeaders);
+                            if (channelHandler.isPresent()) {
+                                ch.pipeline().addLast(channelHandler.get());
+                            }
+                        }
+
                         ch.pipeline()
-                                .addLast(new HttpServerCodec())
                                 .addLast(new ChunkedWriteHandler())
                                 .addLast(new HttpRequestHandler(uploadDir))
                                 .addLast(handler.getName(), handler)
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
index e9eee95..3151f94 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
 import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.TestLogger;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -70,7 +71,7 @@ public final class RestAPIStabilityTest extends TestLogger {
     }
 
     @Test
-    public void testDispatcherRestAPIStability() throws IOException {
+    public void testDispatcherRestAPIStability() throws IOException, 
ConfigurationException {
         final String versionedSnapshotFileName =
                 String.format(SNAPSHOT_RESOURCE_PATTERN, 
apiVersion.getURLVersionPrefix());
 
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index e31e103..54108e0 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -424,7 +424,7 @@ public class HistoryServerTest extends TestLogger {
         env.execute();
     }
 
-    static Tuple2<Integer, String> getFromHTTP(String url) throws Exception {
+    public static Tuple2<Integer, String> getFromHTTP(String url) throws 
Exception {
         URL u = new URL(url);
         HttpURLConnection connection = (HttpURLConnection) u.openConnection();
         connection.setConnectTimeout(100000);
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrapTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrapTest.java
new file mode 100644
index 0000000..f58093f
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrapTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.utils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.io.network.netty.Prio0InboundChannelHandlerFactory;
+import 
org.apache.flink.runtime.io.network.netty.Prio1InboundChannelHandlerFactory;
+import org.apache.flink.runtime.rest.handler.router.Router;
+import 
org.apache.flink.runtime.webmonitor.history.HistoryServerStaticFileServerHandler;
+import org.apache.flink.runtime.webmonitor.history.HistoryServerTest;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the WebFrontendBootstrap. */
+public class WebFrontendBootstrapTest {
+
+    @Rule public TemporaryFolder tmp = new TemporaryFolder();
+
+    @Test
+    public void testHandlersMustBeLoaded() throws Exception {
+        File webDir = tmp.newFolder("webDir");
+        Configuration configuration = new Configuration();
+        configuration.setString(
+                Prio0InboundChannelHandlerFactory.REDIRECT_FROM_URL, 
"/nonExisting");
+        
configuration.setString(Prio0InboundChannelHandlerFactory.REDIRECT_TO_URL, 
"/index.html");
+        Router router =
+                new Router().addGet("/:*", new 
HistoryServerStaticFileServerHandler(webDir));
+        WebFrontendBootstrap webUI =
+                new WebFrontendBootstrap(
+                        router,
+                        
LoggerFactory.getLogger(WebFrontendBootstrapTest.class),
+                        tmp.newFolder("uploadDir"),
+                        null,
+                        "localhost",
+                        0,
+                        configuration);
+
+        assertEquals(webUI.inboundChannelHandlerFactories.size(), 2);
+        assertTrue(
+                webUI.inboundChannelHandlerFactories.get(0)
+                        instanceof Prio1InboundChannelHandlerFactory);
+        assertTrue(
+                webUI.inboundChannelHandlerFactories.get(1)
+                        instanceof Prio0InboundChannelHandlerFactory);
+
+        int port = webUI.getServerPort();
+        try {
+            Tuple2<Integer, String> index =
+                    HistoryServerTest.getFromHTTP("http://localhost:"; + port + 
"/index.html");
+            Assert.assertEquals(index.f0.intValue(), 200);
+            Assert.assertTrue(index.f1.contains("Apache Flink Web Dashboard"));
+
+            Tuple2<Integer, String> index2 =
+                    HistoryServerTest.getFromHTTP("http://localhost:"; + port + 
"/nonExisting");
+            Assert.assertEquals(index2.f0.intValue(), 200);
+            Assert.assertEquals(index, index2);
+        } finally {
+            webUI.shutdown();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 29483bf..e241437 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
@@ -35,6 +34,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.WebMonitorExtension;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
@@ -53,7 +53,6 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway
     private WebMonitorExtension webSubmissionExtension;
 
     public DispatcherRestEndpoint(
-            RestServerEndpointConfiguration endpointConfiguration,
             GatewayRetriever<DispatcherGateway> leaderRetriever,
             Configuration clusterConfiguration,
             RestHandlerConfiguration restConfiguration,
@@ -64,10 +63,9 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway
             LeaderElectionService leaderElectionService,
             ExecutionGraphCache executionGraphCache,
             FatalErrorHandler fatalErrorHandler)
-            throws IOException {
+            throws IOException, ConfigurationException {
 
         super(
-                endpointConfiguration,
                 leaderRetriever,
                 clusterConfiguration,
                 restConfiguration,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundChannelHandlerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundChannelHandlerFactory.java
new file mode 100644
index 0000000..4316224
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundChannelHandlerFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.ConfigurationException;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Custom netty inbound handler factory in order to make custom changes on 
netty inbound data. Good
+ * example usage of this API is custom authentication. When the user is not 
authenticated then the
+ * instantiated channel handler can send back 401 to trigger negotiation. 
Since implementations are
+ * loaded with service loader it's discouraged to store any internal state in 
factories.
+ */
+@Experimental
+public interface InboundChannelHandlerFactory {
+    /**
+     * Gives back priority of the {@link ChannelHandler}. The bigger the value 
is, the earlier it is
+     * executed. If multiple handlers have the same priority then the order is 
not defined.
+     *
+     * @return the priority of the {@link ChannelHandler}.
+     */
+    int priority();
+
+    /**
+     * Creates new instance of {@link ChannelHandler}
+     *
+     * @param configuration The Flink {@link Configuration}.
+     * @param responseHeaders The response headers.
+     * @return {@link ChannelHandler} or null if no custom handler need to be 
created.
+     * @throws ConfigurationException Thrown, if the handler configuration is 
incorrect.
+     */
+    Optional<ChannelHandler> createHandler(
+            Configuration configuration, Map<String, String> responseHeaders)
+            throws ConfigurationException;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundChannelHandlerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundChannelHandlerFactory.java
new file mode 100644
index 0000000..564394d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundChannelHandlerFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.ConfigurationException;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import java.util.Optional;
+
+/**
+ * Custom netty outbound handler factory in order to make custom changes on 
netty outbound data.
+ * Good example usage of this API is custom authentication. When the user 
wants to send
+ * authentication information then the instantiated channel handler can modify 
the HTTP request.
+ * Since implementations are loaded with service loader it's discouraged to 
store any internal state
+ * in factories.
+ */
+@Experimental
+public interface OutboundChannelHandlerFactory {
+    /**
+     * Gives back priority of the {@link ChannelHandler}. The bigger the value 
is, the earlier it is
+     * executed. If multiple handlers have the same priority then the order is 
not defined.
+     *
+     * @return the priority of the {@link ChannelHandler}.
+     */
+    int priority();
+
+    /**
+     * Creates new instance of {@link ChannelHandler}
+     *
+     * @param configuration The Flink {@link Configuration}.
+     * @return {@link ChannelHandler} or null if no custom handler needs to be 
created.
+     * @throws ConfigurationException Thrown, if the handler configuration is 
incorrect.
+     */
+    Optional<ChannelHandler> createHandler(Configuration configuration)
+            throws ConfigurationException;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
index 8482e49..5b0e494 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
 
 import java.io.IOException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -39,7 +39,6 @@ import java.util.concurrent.ScheduledExecutorService;
 public class MiniDispatcherRestEndpoint extends 
WebMonitorEndpoint<RestfulGateway> {
 
     public MiniDispatcherRestEndpoint(
-            RestServerEndpointConfiguration endpointConfiguration,
             GatewayRetriever<? extends RestfulGateway> leaderRetriever,
             Configuration clusterConfiguration,
             RestHandlerConfiguration restConfiguration,
@@ -50,9 +49,8 @@ public class MiniDispatcherRestEndpoint extends 
WebMonitorEndpoint<RestfulGatewa
             LeaderElectionService leaderElectionService,
             ExecutionGraphCache executionGraphCache,
             FatalErrorHandler fatalErrorHandler)
-            throws IOException {
+            throws IOException, ConfigurationException {
         super(
-                endpointConfiguration,
                 leaderRetriever,
                 clusterConfiguration,
                 restConfiguration,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
index f2db42f..9ce1d7f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
@@ -52,7 +52,6 @@ public enum JobRestEndpointFactory implements 
RestEndpointFactory<RestfulGateway
                 RestHandlerConfiguration.fromConfiguration(configuration);
 
         return new MiniDispatcherRestEndpoint(
-                
RestServerEndpointConfiguration.fromConfiguration(configuration),
                 dispatcherGatewayRetriever,
                 configuration,
                 restHandlerConfiguration,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index cf75a95..4cb1eed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.runtime.rest;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.io.network.netty.OutboundChannelHandlerFactory;
 import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -34,6 +37,7 @@ import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.Preconditions;
@@ -51,6 +55,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
@@ -86,8 +91,14 @@ import java.io.InputStream;
 import java.io.StringWriter;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.ServiceLoader;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -111,12 +122,35 @@ public class RestClient implements AutoCloseableAsync {
 
     private final AtomicBoolean isRunning = new AtomicBoolean(true);
 
-    public RestClient(RestClientConfiguration configuration, Executor 
executor) {
+    @VisibleForTesting List<OutboundChannelHandlerFactory> 
outboundChannelHandlerFactories;
+
+    public RestClient(Configuration configuration, Executor executor)
+            throws ConfigurationException {
         Preconditions.checkNotNull(configuration);
         this.executor = Preconditions.checkNotNull(executor);
         this.terminationFuture = new CompletableFuture<>();
+        outboundChannelHandlerFactories = new ArrayList<>();
+        ServiceLoader<OutboundChannelHandlerFactory> loader =
+                ServiceLoader.load(OutboundChannelHandlerFactory.class);
+        final Iterator<OutboundChannelHandlerFactory> factories = 
loader.iterator();
+        while (factories.hasNext()) {
+            try {
+                final OutboundChannelHandlerFactory factory = factories.next();
+                if (factory != null) {
+                    outboundChannelHandlerFactories.add(factory);
+                    LOG.info("Loaded channel outbound factory: {}", factory);
+                }
+            } catch (Throwable e) {
+                LOG.error("Could not load channel outbound factory.", e);
+                throw e;
+            }
+        }
+        outboundChannelHandlerFactories.sort(
+                
Comparator.comparingInt(OutboundChannelHandlerFactory::priority).reversed());
 
-        final SSLHandlerFactory sslHandlerFactory = 
configuration.getSslHandlerFactory();
+        final RestClientConfiguration restConfiguration =
+                RestClientConfiguration.fromConfiguration(configuration);
+        final SSLHandlerFactory sslHandlerFactory = 
restConfiguration.getSslHandlerFactory();
         ChannelInitializer<SocketChannel> initializer =
                 new ChannelInitializer<SocketChannel>() {
                     @Override
@@ -137,14 +171,26 @@ public class RestClient implements AutoCloseableAsync {
                                     .addLast(new HttpClientCodec())
                                     .addLast(
                                             new HttpObjectAggregator(
-                                                    
configuration.getMaxContentLength()))
+                                                    
restConfiguration.getMaxContentLength()));
+
+                            for (OutboundChannelHandlerFactory factory :
+                                    outboundChannelHandlerFactories) {
+                                Optional<ChannelHandler> channelHandler =
+                                        factory.createHandler(configuration);
+                                if (channelHandler.isPresent()) {
+                                    
socketChannel.pipeline().addLast(channelHandler.get());
+                                }
+                            }
+
+                            socketChannel
+                                    .pipeline()
                                     .addLast(new ChunkedWriteHandler()) // 
required for
                                     // multipart-requests
                                     .addLast(
                                             new IdleStateHandler(
-                                                    
configuration.getIdlenessTimeout(),
-                                                    
configuration.getIdlenessTimeout(),
-                                                    
configuration.getIdlenessTimeout(),
+                                                    
restConfiguration.getIdlenessTimeout(),
+                                                    
restConfiguration.getIdlenessTimeout(),
+                                                    
restConfiguration.getIdlenessTimeout(),
                                                     TimeUnit.MILLISECONDS))
                                     .addLast(new ClientHandler());
                         } catch (Throwable t) {
@@ -160,7 +206,7 @@ public class RestClient implements AutoCloseableAsync {
         bootstrap
                 .option(
                         ChannelOption.CONNECT_TIMEOUT_MILLIS,
-                        Math.toIntExact(configuration.getConnectionTimeout()))
+                        
Math.toIntExact(restConfiguration.getConnectionTimeout()))
                 .group(group)
                 .channel(NioSocketChannel.class)
                 .handler(initializer);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 5ee0894..f11450e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.rest;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory;
 import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
 import org.apache.flink.runtime.net.RedirectingSslHandler;
 import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
@@ -30,6 +32,7 @@ import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.rest.handler.router.RouterHandler;
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.Preconditions;
@@ -40,6 +43,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrapConfig;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
 import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
@@ -61,6 +65,7 @@ import java.net.InetSocketAddress;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -69,6 +74,8 @@ import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -81,6 +88,7 @@ public abstract class RestServerEndpoint implements 
AutoCloseableAsync {
 
     private final Object lock = new Object();
 
+    private final Configuration configuration;
     private final String restAddress;
     private final String restBindAddress;
     private final String restBindPortRange;
@@ -99,21 +107,47 @@ public abstract class RestServerEndpoint implements 
AutoCloseableAsync {
 
     private State state = State.CREATED;
 
-    public RestServerEndpoint(RestServerEndpointConfiguration configuration) 
throws IOException {
+    @VisibleForTesting List<InboundChannelHandlerFactory> 
inboundChannelHandlerFactories;
+
+    public RestServerEndpoint(Configuration configuration)
+            throws IOException, ConfigurationException {
         Preconditions.checkNotNull(configuration);
+        RestServerEndpointConfiguration restConfiguration =
+                
RestServerEndpointConfiguration.fromConfiguration(configuration);
+        Preconditions.checkNotNull(restConfiguration);
 
-        this.restAddress = configuration.getRestAddress();
-        this.restBindAddress = configuration.getRestBindAddress();
-        this.restBindPortRange = configuration.getRestBindPortRange();
-        this.sslHandlerFactory = configuration.getSslHandlerFactory();
+        this.configuration = configuration;
+        this.restAddress = restConfiguration.getRestAddress();
+        this.restBindAddress = restConfiguration.getRestBindAddress();
+        this.restBindPortRange = restConfiguration.getRestBindPortRange();
+        this.sslHandlerFactory = restConfiguration.getSslHandlerFactory();
 
-        this.uploadDir = configuration.getUploadDir();
+        this.uploadDir = restConfiguration.getUploadDir();
         createUploadDir(uploadDir, log, true);
 
-        this.maxContentLength = configuration.getMaxContentLength();
-        this.responseHeaders = configuration.getResponseHeaders();
+        this.maxContentLength = restConfiguration.getMaxContentLength();
+        this.responseHeaders = restConfiguration.getResponseHeaders();
 
         terminationFuture = new CompletableFuture<>();
+
+        inboundChannelHandlerFactories = new ArrayList<>();
+        ServiceLoader<InboundChannelHandlerFactory> loader =
+                ServiceLoader.load(InboundChannelHandlerFactory.class);
+        final Iterator<InboundChannelHandlerFactory> factories = 
loader.iterator();
+        while (factories.hasNext()) {
+            try {
+                final InboundChannelHandlerFactory factory = factories.next();
+                if (factory != null) {
+                    inboundChannelHandlerFactories.add(factory);
+                    log.info("Loaded channel inbound factory: {}", factory);
+                }
+            } catch (Throwable e) {
+                log.error("Could not load channel inbound factory.", e);
+                throw e;
+            }
+        }
+        inboundChannelHandlerFactories.sort(
+                
Comparator.comparingInt(InboundChannelHandlerFactory::priority).reversed());
     }
 
     /**
@@ -159,7 +193,7 @@ public abstract class RestServerEndpoint implements 
AutoCloseableAsync {
                     new ChannelInitializer<SocketChannel>() {
 
                         @Override
-                        protected void initChannel(SocketChannel ch) {
+                        protected void initChannel(SocketChannel ch) throws 
ConfigurationException {
                             RouterHandler handler = new RouterHandler(router, 
responseHeaders);
 
                             // SSL should be the first handler in the pipeline
@@ -173,8 +207,18 @@ public abstract class RestServerEndpoint implements 
AutoCloseableAsync {
                                                         sslHandlerFactory));
                             }
 
+                            ch.pipeline().addLast(new HttpServerCodec());
+
+                            for (InboundChannelHandlerFactory factory :
+                                    inboundChannelHandlerFactories) {
+                                Optional<ChannelHandler> channelHandler =
+                                        factory.createHandler(configuration, 
responseHeaders);
+                                if (channelHandler.isPresent()) {
+                                    
ch.pipeline().addLast(channelHandler.get());
+                                }
+                            }
+
                             ch.pipeline()
-                                    .addLast(new HttpServerCodec())
                                     .addLast(new FileUploadHandler(uploadDir))
                                     .addLast(
                                             new FlinkHttpObjectAggregator(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
index 9a39a2c..4b5b553 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
@@ -51,7 +51,6 @@ public enum SessionRestEndpointFactory implements 
RestEndpointFactory<Dispatcher
                 RestHandlerConfiguration.fromConfiguration(configuration);
 
         return new DispatcherRestEndpoint(
-                
RestServerEndpointConfiguration.fromConfiguration(configuration),
                 dispatcherGatewayRetriever,
                 configuration,
                 restHandlerConfiguration,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
index d4d7a6a..94da100 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
@@ -123,7 +123,7 @@ public class RouterHandler extends 
SimpleChannelInboundHandler<HttpRequest> {
         HandlerUtils.sendErrorResponse(
                 channelHandlerContext,
                 request,
-                new ErrorResponseBody("Not found."),
+                new ErrorResponseBody("Not found: " + request.uri()),
                 HttpResponseStatus.NOT_FOUND,
                 responseHeaders);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 5071e4e..9ce6aeb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
@@ -139,6 +138,7 @@ import 
org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoStats;
 import 
org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTracker;
 import 
org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTrackerBuilder;
 import 
org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoRequestCoordinator;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FileUtils;
@@ -197,7 +197,6 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
     @Nullable private ScheduledFuture<?> executionGraphCleanupTask;
 
     public WebMonitorEndpoint(
-            RestServerEndpointConfiguration endpointConfiguration,
             GatewayRetriever<? extends T> leaderRetriever,
             Configuration clusterConfiguration,
             RestHandlerConfiguration restConfiguration,
@@ -208,8 +207,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
             LeaderElectionService leaderElectionService,
             ExecutionGraphCache executionGraphCache,
             FatalErrorHandler fatalErrorHandler)
-            throws IOException {
-        super(endpointConfiguration);
+            throws IOException, ConfigurationException {
+        super(clusterConfiguration);
         this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
         this.clusterConfiguration = 
Preconditions.checkNotNull(clusterConfiguration);
         this.restConfiguration = Preconditions.checkNotNull(restConfiguration);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/Prio0InboundChannelHandlerFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/Prio0InboundChannelHandlerFactory.java
new file mode 100644
index 0000000..97fe486
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/Prio0InboundChannelHandlerFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.ConfigurationException;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Test inbound channel handler factory. */
+public class Prio0InboundChannelHandlerFactory implements 
InboundChannelHandlerFactory {
+    public static final ConfigOption<String> REDIRECT_FROM_URL =
+            
ConfigOptions.key("test.in.redirect.from.url").stringType().defaultValue("");
+    public static final ConfigOption<String> REDIRECT_TO_URL =
+            
ConfigOptions.key("test.in.redirect.to.url").stringType().defaultValue("");
+
+    @Override
+    public int priority() {
+        return 0;
+    }
+
+    @Override
+    public Optional<ChannelHandler> createHandler(
+            Configuration configuration, Map<String, String> responseHeaders)
+            throws ConfigurationException {
+        String redirectFromUrl = configuration.getString(REDIRECT_FROM_URL);
+        String redirectToUrl = configuration.getString(REDIRECT_TO_URL);
+        if (!redirectFromUrl.isEmpty() && !redirectToUrl.isEmpty()) {
+            return Optional.of(
+                    new ChannelInboundHandlerAdapter() {
+                        @Override
+                        public void channelRead(ChannelHandlerContext ctx, 
Object msg) {
+                            if (msg instanceof HttpRequest) {
+                                HttpRequest httpRequest = (HttpRequest) msg;
+                                if (httpRequest.uri().equals(redirectFromUrl)) 
{
+                                    httpRequest.setUri(redirectToUrl);
+                                }
+                            }
+                            
ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
+                        }
+                    });
+        }
+        return Optional.empty();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/Prio1InboundChannelHandlerFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/Prio1InboundChannelHandlerFactory.java
new file mode 100644
index 0000000..6f698bd
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/Prio1InboundChannelHandlerFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.ConfigurationException;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Test inbound channel handler factory. */
+public class Prio1InboundChannelHandlerFactory implements 
InboundChannelHandlerFactory {
+    @Override
+    public int priority() {
+        return 1;
+    }
+
+    @Override
+    public Optional<ChannelHandler> createHandler(
+            Configuration configuration, Map<String, String> responseHeaders)
+            throws ConfigurationException {
+        return Optional.empty();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 0d4394b..9e5e4d7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -105,9 +105,6 @@ public class MultipartUploadResource extends 
ExternalResource {
         configuredUploadDir = temporaryFolder.newFolder().toPath();
         config.setString(WebOptions.UPLOAD_DIR, 
configuredUploadDir.toString());
 
-        RestServerEndpointConfiguration serverConfig =
-                RestServerEndpointConfiguration.fromConfiguration(config);
-
         RestfulGateway mockRestfulGateway = mock(RestfulGateway.class);
 
         final GatewayRetriever<RestfulGateway> mockGatewayRetriever =
@@ -125,7 +122,7 @@ public class MultipartUploadResource extends 
ExternalResource {
         fileHandler = new MultipartFileHandler(mockGatewayRetriever);
 
         serverEndpoint =
-                TestRestServerEndpoint.builder(serverConfig)
+                TestRestServerEndpoint.builder(config)
                         .withHandler(mixedHandler)
                         .withHandler(jsonHandler)
                         .withHandler(fileHandler)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/Prio0OutboundChannelHandlerFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/Prio0OutboundChannelHandlerFactory.java
new file mode 100644
index 0000000..398cd6c
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/Prio0OutboundChannelHandlerFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.netty.OutboundChannelHandlerFactory;
+import org.apache.flink.util.ConfigurationException;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelDuplexHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+
+import java.util.Optional;
+
+/** Test outbound channel handler factory. */
+public class Prio0OutboundChannelHandlerFactory implements 
OutboundChannelHandlerFactory {
+    public static final ConfigOption<String> REDIRECT_TO_URL =
+            
ConfigOptions.key("test.out.redirect.to.url").stringType().defaultValue("");
+
+    @Override
+    public int priority() {
+        return 0;
+    }
+
+    @Override
+    public Optional<ChannelHandler> createHandler(Configuration configuration)
+            throws ConfigurationException {
+        String redirectToUrl = configuration.getString(REDIRECT_TO_URL);
+        if (!redirectToUrl.isEmpty()) {
+            return Optional.of(
+                    new ChannelDuplexHandler() {
+                        @Override
+                        public void write(
+                                ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
+                                throws Exception {
+                            if (msg instanceof HttpRequest) {
+                                HttpRequest httpRequest = (HttpRequest) msg;
+                                httpRequest.setUri(redirectToUrl);
+                            }
+                            super.write(ctx, msg, promise);
+                        }
+                    });
+        }
+        return Optional.empty();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/Prio1OutboundChannelHandlerFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/Prio1OutboundChannelHandlerFactory.java
new file mode 100644
index 0000000..f44ea84
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/Prio1OutboundChannelHandlerFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.netty.OutboundChannelHandlerFactory;
+import org.apache.flink.util.ConfigurationException;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import java.util.Optional;
+
+/** Test outbound channel handler factory. */
+public class Prio1OutboundChannelHandlerFactory implements 
OutboundChannelHandlerFactory {
+    @Override
+    public int priority() {
+        return 1;
+    }
+
+    @Override
+    public Optional<ChannelHandler> createHandler(Configuration configuration)
+            throws ConfigurationException {
+        return Optional.empty();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java
index 2ed7275..97b5992 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java
@@ -50,10 +50,7 @@ public class RestClientMultipartTest extends TestLogger {
 
     @BeforeClass
     public static void setupClient() throws ConfigurationException {
-        restClient =
-                new RestClient(
-                        RestClientConfiguration.fromConfiguration(new 
Configuration()),
-                        TestingUtils.defaultExecutor());
+        restClient = new RestClient(new Configuration(), 
TestingUtils.defaultExecutor());
     }
 
     @After
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index a77961b..3b74857 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -61,10 +61,7 @@ public class RestClientTest extends TestLogger {
     public void testConnectionTimeout() throws Exception {
         final Configuration config = new Configuration();
         config.setLong(RestOptions.CONNECTION_TIMEOUT, 1);
-        try (final RestClient restClient =
-                new RestClient(
-                        RestClientConfiguration.fromConfiguration(config),
-                        Executors.directExecutor())) {
+        try (final RestClient restClient = new RestClient(config, 
Executors.directExecutor())) {
             restClient
                     .sendRequest(
                             unroutableIp,
@@ -83,9 +80,7 @@ public class RestClientTest extends TestLogger {
     @Test
     public void testInvalidVersionRejection() throws Exception {
         try (final RestClient restClient =
-                new RestClient(
-                        RestClientConfiguration.fromConfiguration(new 
Configuration()),
-                        Executors.directExecutor())) {
+                new RestClient(new Configuration(), 
Executors.directExecutor())) {
             CompletableFuture<EmptyResponseBody> invalidVersionResponse =
                     restClient.sendRequest(
                             unroutableIp,
@@ -108,9 +103,7 @@ public class RestClientTest extends TestLogger {
         config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
         try (final ServerSocket serverSocket = new ServerSocket(0);
                 final RestClient restClient =
-                        new RestClient(
-                                
RestClientConfiguration.fromConfiguration(config),
-                                TestingUtils.defaultExecutor())) {
+                        new RestClient(config, 
TestingUtils.defaultExecutor())) {
 
             final String targetAddress = "localhost";
             final int targetPort = serverSocket.getLocalPort();
@@ -162,9 +155,7 @@ public class RestClientTest extends TestLogger {
 
         try (final ServerSocket serverSocket = new ServerSocket(0);
                 final RestClient restClient =
-                        new RestClient(
-                                
RestClientConfiguration.fromConfiguration(config),
-                                TestingUtils.defaultExecutor())) {
+                        new RestClient(config, 
TestingUtils.defaultExecutor())) {
 
             final String targetAddress = "localhost";
             final int targetPort = serverSocket.getLocalPort();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestExternalHandlersITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestExternalHandlersITCase.java
new file mode 100644
index 0000000..bc8dd31
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestExternalHandlersITCase.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import 
org.apache.flink.runtime.io.network.netty.Prio0InboundChannelHandlerFactory;
+import 
org.apache.flink.runtime.io.network.netty.Prio1InboundChannelHandlerFactory;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.TestRestServerEndpoint;
+import org.apache.flink.runtime.testutils.TestingUtils;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** IT cases for {@link RestClient} and {@link RestServerEndpoint}. */
+public class RestExternalHandlersITCase extends TestLogger {
+
+    private static final Time timeout = Time.seconds(10L);
+    private static final String REQUEST_URL = "/nonExisting1";
+    private static final String REDIRECT1_URL = "/nonExisting2";
+    private static final String REDIRECT2_URL = "/nonExisting3";
+
+    private RestServerEndpoint serverEndpoint;
+    private RestClient restClient;
+    private InetSocketAddress serverAddress;
+
+    private final Configuration config;
+
+    public RestExternalHandlersITCase() {
+        this.config = getBaseConfig();
+    }
+
+    private static Configuration getBaseConfig() {
+        final String loopbackAddress = 
InetAddress.getLoopbackAddress().getHostAddress();
+
+        final Configuration config = new Configuration();
+        config.setString(RestOptions.BIND_PORT, "0");
+        config.setString(RestOptions.BIND_ADDRESS, loopbackAddress);
+        config.setString(RestOptions.ADDRESS, loopbackAddress);
+        config.setString(Prio0OutboundChannelHandlerFactory.REDIRECT_TO_URL, 
REDIRECT1_URL);
+        config.setString(Prio0InboundChannelHandlerFactory.REDIRECT_FROM_URL, 
REDIRECT1_URL);
+        config.setString(Prio0InboundChannelHandlerFactory.REDIRECT_TO_URL, 
REDIRECT2_URL);
+        return config;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        serverEndpoint = 
TestRestServerEndpoint.builder(config).buildAndStart();
+        restClient = new TestRestClient(config);
+        serverAddress = serverEndpoint.getServerAddress();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (restClient != null) {
+            restClient.shutdown(timeout);
+            restClient = null;
+        }
+
+        if (serverEndpoint != null) {
+            serverEndpoint.closeAsync().get(timeout.getSize(), 
timeout.getUnit());
+            serverEndpoint = null;
+        }
+    }
+
+    @Test
+    public void testHandlersMustBeLoaded() throws Exception {
+        assertEquals(serverEndpoint.inboundChannelHandlerFactories.size(), 2);
+        assertTrue(
+                serverEndpoint.inboundChannelHandlerFactories.get(0)
+                        instanceof Prio1InboundChannelHandlerFactory);
+        assertTrue(
+                serverEndpoint.inboundChannelHandlerFactories.get(1)
+                        instanceof Prio0InboundChannelHandlerFactory);
+
+        assertEquals(restClient.outboundChannelHandlerFactories.size(), 2);
+        assertTrue(
+                restClient.outboundChannelHandlerFactories.get(0)
+                        instanceof Prio1OutboundChannelHandlerFactory);
+        assertTrue(
+                restClient.outboundChannelHandlerFactories.get(1)
+                        instanceof Prio0OutboundChannelHandlerFactory);
+
+        try {
+            final CompletableFuture<TestResponse> response =
+                    sendRequestToTestHandler(new TestRequest());
+            response.get();
+            fail("Request must fail with 2 times redirected URL");
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains(REDIRECT2_URL));
+        }
+    }
+
+    private CompletableFuture<TestResponse> sendRequestToTestHandler(
+            final TestRequest testRequest) {
+        try {
+            return restClient.sendRequest(
+                    serverAddress.getHostName(),
+                    serverAddress.getPort(),
+                    new TestHeaders(),
+                    EmptyMessageParameters.getInstance(),
+                    testRequest);
+        } catch (final IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static class TestRestClient extends RestClient {
+
+        TestRestClient(Configuration configuration) throws 
ConfigurationException {
+            super(configuration, TestingUtils.defaultExecutor());
+        }
+    }
+
+    private static class TestRequest implements RequestBody {}
+
+    private static class TestResponse implements ResponseBody {}
+
+    private static class TestHeaders
+            implements MessageHeaders<TestRequest, TestResponse, 
EmptyMessageParameters> {
+
+        @Override
+        public HttpMethodWrapper getHttpMethod() {
+            return HttpMethodWrapper.POST;
+        }
+
+        @Override
+        public String getTargetRestEndpointURL() {
+            return REQUEST_URL;
+        }
+
+        @Override
+        public Class<TestRequest> getRequestClass() {
+            return TestRequest.class;
+        }
+
+        @Override
+        public Class<TestResponse> getResponseClass() {
+            return TestResponse.class;
+        }
+
+        @Override
+        public HttpResponseStatus getResponseStatusCode() {
+            return HttpResponseStatus.OK;
+        }
+
+        @Override
+        public String getDescription() {
+            return "";
+        }
+
+        @Override
+        public EmptyMessageParameters getUnresolvedMessageParameters() {
+            return EmptyMessageParameters.getInstance();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 383e2df..1bd181e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.testutils.TestingUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.TestLogger;
@@ -195,10 +196,6 @@ public class RestServerEndpointITCase extends TestLogger {
             
HttpsURLConnection.setDefaultSSLSocketFactory(sslClientContext.getSocketFactory());
         }
 
-        RestServerEndpointConfiguration serverConfig =
-                RestServerEndpointConfiguration.fromConfiguration(config);
-        RestClientConfiguration clientConfig = 
RestClientConfiguration.fromConfiguration(config);
-
         RestfulGateway mockRestfulGateway = new 
TestingRestfulGateway.Builder().build();
 
         final GatewayRetriever<RestfulGateway> mockGatewayRetriever =
@@ -234,7 +231,7 @@ public class RestServerEndpointITCase extends TestLogger {
                         mockGatewayRetriever, RpcUtils.INF_TIMEOUT, 
temporaryFolder.getRoot());
 
         serverEndpoint =
-                TestRestServerEndpoint.builder(serverConfig)
+                TestRestServerEndpoint.builder(config)
                         .withHandler(new TestHeaders(), testHandler)
                         .withHandler(TestUploadHeaders.INSTANCE, 
testUploadHandler)
                         .withHandler(testVersionHandler)
@@ -244,7 +241,7 @@ public class RestServerEndpointITCase extends TestLogger {
                                 WebContentHandlerSpecification.getInstance(),
                                 staticFileServerHandler)
                         .buildAndStart();
-        restClient = new TestRestClient(clientConfig);
+        restClient = new TestRestClient(config);
 
         serverAddress = serverEndpoint.getServerAddress();
     }
@@ -639,13 +636,9 @@ public class RestServerEndpointITCase extends TestLogger {
         config.setString(RestOptions.ADDRESS, "localhost");
         config.setString(RestOptions.BIND_PORT, portRangeStart + "-" + 
portRangeEnd);
 
-        final RestServerEndpointConfiguration serverConfig =
-                RestServerEndpointConfiguration.fromConfiguration(config);
-
-        try (RestServerEndpoint serverEndpoint1 =
-                        TestRestServerEndpoint.builder(serverConfig).build();
+        try (RestServerEndpoint serverEndpoint1 = 
TestRestServerEndpoint.builder(config).build();
                 RestServerEndpoint serverEndpoint2 =
-                        TestRestServerEndpoint.builder(serverConfig).build()) {
+                        TestRestServerEndpoint.builder(config).build()) {
 
             serverEndpoint1.start();
             serverEndpoint2.start();
@@ -672,15 +665,12 @@ public class RestServerEndpointITCase extends TestLogger {
 
     @Test
     public void testEndpointsMustBeUnique() throws Exception {
-        final RestServerEndpointConfiguration serverConfig =
-                RestServerEndpointConfiguration.fromConfiguration(config);
-
         assertThrows(
                 "REST handler registration",
                 FlinkRuntimeException.class,
                 () -> {
                     try (TestRestServerEndpoint restServerEndpoint =
-                            TestRestServerEndpoint.builder(serverConfig)
+                            TestRestServerEndpoint.builder(config)
                                     .withHandler(new TestHeaders(), 
testHandler)
                                     .withHandler(new TestHeaders(), 
testUploadHandler)
                                     .build()) {
@@ -692,15 +682,12 @@ public class RestServerEndpointITCase extends TestLogger {
 
     @Test
     public void testDuplicateHandlerRegistrationIsForbidden() throws Exception 
{
-        final RestServerEndpointConfiguration serverConfig =
-                RestServerEndpointConfiguration.fromConfiguration(config);
-
         assertThrows(
                 "Duplicate REST handler",
                 FlinkRuntimeException.class,
                 () -> {
                     try (TestRestServerEndpoint restServerEndpoint =
-                            TestRestServerEndpoint.builder(serverConfig)
+                            TestRestServerEndpoint.builder(config)
                                     .withHandler(new TestHeaders(), 
testHandler)
                                     .withHandler(TestUploadHeaders.INSTANCE, 
testHandler)
                                     .build()) {
@@ -796,7 +783,7 @@ public class RestServerEndpointITCase extends TestLogger {
 
     static class TestRestClient extends RestClient {
 
-        TestRestClient(RestClientConfiguration configuration) {
+        TestRestClient(Configuration configuration) throws 
ConfigurationException {
             super(configuration, TestingUtils.defaultExecutor());
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
index c0274c9..950e338 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
@@ -104,11 +104,6 @@ public class RestServerSSLAuthITCase extends TestLogger {
         RestServerEndpoint serverEndpoint = null;
 
         try {
-            RestServerEndpointConfiguration restServerConfig =
-                    
RestServerEndpointConfiguration.fromConfiguration(serverConfig);
-            RestClientConfiguration restClientConfig =
-                    RestClientConfiguration.fromConfiguration(clientConfig);
-
             RestfulGateway restfulGateway = new 
TestingRestfulGateway.Builder().build();
             RestServerEndpointITCase.TestVersionHandler testVersionHandler =
                     new RestServerEndpointITCase.TestVersionHandler(
@@ -116,10 +111,10 @@ public class RestServerSSLAuthITCase extends TestLogger {
                             RpcUtils.INF_TIMEOUT);
 
             serverEndpoint =
-                    TestRestServerEndpoint.builder(restServerConfig)
+                    TestRestServerEndpoint.builder(serverConfig)
                             
.withHandler(testVersionHandler.getMessageHeaders(), testVersionHandler)
                             .buildAndStart();
-            restClient = new 
RestServerEndpointITCase.TestRestClient(restClientConfig);
+            restClient = new 
RestServerEndpointITCase.TestRestClient(clientConfig);
 
             CompletableFuture<EmptyResponseBody> response =
                     restClient.sendRequest(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerITCase.java
index af1ef74..6bb1e2d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerITCase.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.rest.handler;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -77,8 +75,7 @@ public class AbstractHandlerITCase extends TestLogger {
         Configuration config = new Configuration(REST_BASE_CONFIG);
         config.setInteger(RestOptions.PORT, serverPort);
 
-        return new RestClient(
-                RestClientConfiguration.fromConfiguration(config), 
Executors.directExecutor());
+        return new RestClient(config, Executors.directExecutor());
     }
 
     @Test
@@ -99,9 +96,7 @@ public class AbstractHandlerITCase extends TestLogger {
                                         new OutOfMemoryError("Metaspace")));
 
         try (final TestRestServerEndpoint server =
-                        TestRestServerEndpoint.builder(
-                                        
RestServerEndpointConfiguration.fromConfiguration(
-                                                REST_BASE_CONFIG))
+                        TestRestServerEndpoint.builder(REST_BASE_CONFIG)
                                 .withHandler(messageHeaders, testRestHandler)
                                 .buildAndStart();
                 final RestClient restClient =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
index 4f17cb1..b83f98a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
@@ -55,7 +54,6 @@ public class DocumentingDispatcherRestEndpoint extends 
DispatcherRestEndpoint
         implements DocumentingRestEndpoint {
 
     private static final Configuration config;
-    private static final RestServerEndpointConfiguration restConfig;
     private static final RestHandlerConfiguration handlerConfig;
     private static final GatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever;
     private static final GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever;
@@ -65,22 +63,14 @@ public class DocumentingDispatcherRestEndpoint extends 
DispatcherRestEndpoint
         config.setString(RestOptions.ADDRESS, "localhost");
         // necessary for loading the web-submission extension
         config.setString(JobManagerOptions.ADDRESS, "localhost");
-        try {
-            restConfig = 
RestServerEndpointConfiguration.fromConfiguration(config);
-        } catch (ConfigurationException e) {
-            throw new RuntimeException(
-                    "Implementation error. 
RestServerEndpointConfiguration#fromConfiguration failed for default 
configuration.",
-                    e);
-        }
         handlerConfig = RestHandlerConfiguration.fromConfiguration(config);
 
         dispatcherGatewayRetriever = () -> null;
         resourceManagerGatewayRetriever = () -> null;
     }
 
-    public DocumentingDispatcherRestEndpoint() throws IOException {
+    public DocumentingDispatcherRestEndpoint() throws IOException, 
ConfigurationException {
         super(
-                restConfig,
                 dispatcherGatewayRetriever,
                 config,
                 handlerConfig,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/TestRestServerEndpoint.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/TestRestServerEndpoint.java
index e5443cf..c9b25fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/TestRestServerEndpoint.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/TestRestServerEndpoint.java
@@ -19,10 +19,11 @@
 package org.apache.flink.runtime.rest.util;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.util.ConfigurationException;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
@@ -34,7 +35,7 @@ import java.util.concurrent.CompletableFuture;
 /** Utility {@link RestServerEndpoint} for setting up a rest server with a 
given set of handlers. */
 public class TestRestServerEndpoint extends RestServerEndpoint {
 
-    public static Builder builder(RestServerEndpointConfiguration 
configuration) {
+    public static Builder builder(Configuration configuration) {
         return new Builder(configuration);
     }
 
@@ -43,11 +44,11 @@ public class TestRestServerEndpoint extends 
RestServerEndpoint {
      */
     public static class Builder {
 
-        private final RestServerEndpointConfiguration configuration;
+        private final Configuration configuration;
         private final List<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers =
                 new ArrayList<>();
 
-        private Builder(RestServerEndpointConfiguration configuration) {
+        private Builder(Configuration configuration) {
             this.configuration = configuration;
         }
 
@@ -62,7 +63,7 @@ public class TestRestServerEndpoint extends 
RestServerEndpoint {
             return this;
         }
 
-        public TestRestServerEndpoint build() throws IOException {
+        public TestRestServerEndpoint build() throws IOException, 
ConfigurationException {
             return new TestRestServerEndpoint(configuration, handlers);
         }
 
@@ -77,9 +78,9 @@ public class TestRestServerEndpoint extends 
RestServerEndpoint {
     private final List<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers;
 
     private TestRestServerEndpoint(
-            final RestServerEndpointConfiguration configuration,
+            final Configuration configuration,
             final List<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers)
-            throws IOException {
+            throws IOException, ConfigurationException {
         super(configuration);
         this.handlers = handlers;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java
index 9406869..bf5dcf3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.NoOpTransientBlobService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -56,7 +55,6 @@ public class WebMonitorEndpointTest extends TestLogger {
                         .build();
         try (final WebMonitorEndpoint<RestfulGateway> webMonitorEndpoint =
                 new WebMonitorEndpoint<>(
-                        
RestServerEndpointConfiguration.fromConfiguration(configuration),
                         CompletableFuture::new,
                         configuration,
                         
RestHandlerConfiguration.fromConfiguration(configuration),
diff --git 
a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory
 
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory
new file mode 100644
index 0000000..09b801c
--- /dev/null
+++ 
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.io.network.netty.Prio0InboundChannelHandlerFactory
+org.apache.flink.runtime.io.network.netty.Prio1InboundChannelHandlerFactory
diff --git 
a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.io.network.netty.OutboundChannelHandlerFactory
 
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.io.network.netty.OutboundChannelHandlerFactory
new file mode 100644
index 0000000..ea7be93
--- /dev/null
+++ 
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.io.network.netty.OutboundChannelHandlerFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.rest.Prio0OutboundChannelHandlerFactory
+org.apache.flink.runtime.rest.Prio1OutboundChannelHandlerFactory
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 3537adc..e602ba2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -48,7 +48,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
@@ -759,8 +758,7 @@ public class SavepointITCase extends TestLogger {
         // ExecutionVertex
         final RestClient restClient =
                 new RestClient(
-                        RestClientConfiguration.fromConfiguration(
-                                new UnmodifiableConfiguration(new 
Configuration())),
+                        new UnmodifiableConfiguration(new Configuration()),
                         TestingUtils.defaultExecutor());
 
         final JobDetailsHeaders detailsHeaders = 
JobDetailsHeaders.getInstance();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
index b9c707f..5c0225d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
 import 
org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview.JobIdWithStatus;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import 
org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
@@ -568,9 +567,7 @@ public class BatchFineGrainedRecoveryITCase extends 
TestLogger {
 
         private RestClient createRestClient() throws ConfigurationException {
             return new RestClient(
-                    RestClientConfiguration.fromConfiguration(
-                            new UnmodifiableConfiguration(new 
Configuration())),
-                    executorService);
+                    new UnmodifiableConfiguration(new Configuration()), 
executorService);
         }
 
         private List<InternalTaskInfo> getInternalTaskInfos() {
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 88d34a0..d878661 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
 import 
org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoEntry;
@@ -134,10 +133,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
         startYARNWithConfig(YARN_CONFIGURATION);
 
         restClientExecutor = Executors.newSingleThreadExecutor();
-        restClient =
-                new RestClient(
-                        RestClientConfiguration.fromConfiguration(new 
Configuration()),
-                        restClientExecutor);
+        restClient = new RestClient(new Configuration(), restClientExecutor);
     }
 
     @AfterClass
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index d216b9e..641a0b5 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -32,7 +32,6 @@ import 
org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
@@ -126,9 +125,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
                         final ApplicationId clusterId = 
clusterClient.getClusterId();
 
                         final RestClient restClient =
-                                new RestClient(
-                                        
RestClientConfiguration.fromConfiguration(configuration),
-                                        TestingUtils.defaultExecutor());
+                                new RestClient(configuration, 
TestingUtils.defaultExecutor());
 
                         try {
                             final ApplicationReport applicationReport =

Reply via email to