This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 646ff2d [FLINK-21108][web] Add custom netty HTTP request
inbound/outbound handlers
646ff2d is described below
commit 646ff2d36f40704f5dca017b8fffed78bd51b307
Author: Gabor Somogyi <[email protected]>
AuthorDate: Wed Jun 23 17:08:44 2021 +0200
[FLINK-21108][web] Add custom netty HTTP request inbound/outbound handlers
Closes #16463
---
.../client/program/rest/RestClusterClient.java | 5 +-
.../RestClusterClientSavepointTriggerTest.java | 10 +-
.../client/program/rest/RestClusterClientTest.java | 11 +-
.../flink/docs/rest/RestAPIDocGenerator.java | 3 +-
.../util/flink/LocalStandaloneFlinkResource.java | 5 +-
.../metrics/tests/MetricsAvailabilityITCase.java | 5 +-
.../webmonitor/utils/WebFrontendBootstrap.java | 48 +++++-
.../rest/compatibility/RestAPIStabilityTest.java | 3 +-
.../webmonitor/history/HistoryServerTest.java | 2 +-
.../webmonitor/utils/WebFrontendBootstrapTest.java | 87 ++++++++++
.../runtime/dispatcher/DispatcherRestEndpoint.java | 6 +-
.../netty/InboundChannelHandlerFactory.java | 57 ++++++
.../netty/OutboundChannelHandlerFactory.java | 55 ++++++
.../jobmaster/MiniDispatcherRestEndpoint.java | 6 +-
.../flink/runtime/rest/JobRestEndpointFactory.java | 1 -
.../org/apache/flink/runtime/rest/RestClient.java | 60 ++++++-
.../flink/runtime/rest/RestServerEndpoint.java | 64 +++++--
.../runtime/rest/SessionRestEndpointFactory.java | 1 -
.../runtime/rest/handler/router/RouterHandler.java | 2 +-
.../runtime/webmonitor/WebMonitorEndpoint.java | 7 +-
.../netty/Prio0InboundChannelHandlerFactory.java | 70 ++++++++
.../netty/Prio1InboundChannelHandlerFactory.java | 42 +++++
.../runtime/rest/MultipartUploadResource.java | 5 +-
.../rest/Prio0OutboundChannelHandlerFactory.java | 66 +++++++
.../rest/Prio1OutboundChannelHandlerFactory.java | 41 +++++
.../runtime/rest/RestClientMultipartTest.java | 5 +-
.../apache/flink/runtime/rest/RestClientTest.java | 17 +-
.../runtime/rest/RestExternalHandlersITCase.java | 192 +++++++++++++++++++++
.../runtime/rest/RestServerEndpointITCase.java | 29 +---
.../runtime/rest/RestServerSSLAuthITCase.java | 9 +-
.../rest/handler/AbstractHandlerITCase.java | 9 +-
.../util/DocumentingDispatcherRestEndpoint.java | 12 +-
.../runtime/rest/util/TestRestServerEndpoint.java | 15 +-
.../runtime/webmonitor/WebMonitorEndpointTest.java | 2 -
...e.io.network.netty.InboundChannelHandlerFactory | 17 ++
....io.network.netty.OutboundChannelHandlerFactory | 17 ++
.../flink/test/checkpointing/SavepointITCase.java | 4 +-
.../recovery/BatchFineGrainedRecoveryITCase.java | 5 +-
.../yarn/YARNSessionCapacitySchedulerITCase.java | 6 +-
.../apache/flink/yarn/YarnConfigurationITCase.java | 5 +-
40 files changed, 847 insertions(+), 159 deletions(-)
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 60a02ac..68418ef 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -207,10 +207,7 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
if (restClient != null) {
this.restClient = restClient;
} else {
- this.restClient =
- new RestClient(
-
restClusterClientConfiguration.getRestClientConfiguration(),
- executorService);
+ this.restClient = new RestClient(configuration, executorService);
}
this.waitStrategy = checkNotNull(waitStrategy);
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java
index 65345b3..946c8dc 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java
@@ -26,9 +26,7 @@ import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.RestServerEndpoint;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -94,8 +92,6 @@ public class RestClusterClientSavepointTriggerTest extends
TestLogger {
private static final GatewayRetriever<DispatcherGateway>
mockGatewayRetriever =
() -> CompletableFuture.completedFuture(mockRestfulGateway);
- private static RestServerEndpointConfiguration
restServerEndpointConfiguration;
-
private static ExecutorService executor;
private static final Configuration REST_CONFIG;
@@ -112,8 +108,6 @@ public class RestClusterClientSavepointTriggerTest extends
TestLogger {
@BeforeClass
public static void setUp() throws ConfigurationException {
- restServerEndpointConfiguration =
- RestServerEndpointConfiguration.fromConfiguration(REST_CONFIG);
executor =
Executors.newSingleThreadExecutor(
new ExecutorThreadFactory(
@@ -273,7 +267,7 @@ public class RestClusterClientSavepointTriggerTest extends
TestLogger {
final FunctionWithException<TriggerId, SavepointInfo,
RestHandlerException>
savepointHandlerLogic)
throws Exception {
- return TestRestServerEndpoint.builder(restServerEndpointConfiguration)
+ return TestRestServerEndpoint.builder(REST_CONFIG)
.withHandler(new
TestSavepointTriggerHandler(triggerHandlerLogic))
.withHandler(new TestSavepointHandler(savepointHandlerLogic))
.buildAndStart();
@@ -355,7 +349,7 @@ public class RestClusterClientSavepointTriggerTest extends
TestLogger {
clientConfig.setInteger(RestOptions.PORT, port);
return new RestClusterClient<>(
clientConfig,
- new
RestClient(RestClientConfiguration.fromConfiguration(REST_CONFIG), executor),
+ new RestClient(REST_CONFIG, executor),
StandaloneClusterId.getInstance(),
(attempt) -> 0);
}
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 0cf2f9e..9c9a6ef 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -45,8 +45,6 @@ import
org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -154,8 +152,6 @@ public class RestClusterClientTest extends TestLogger {
private GatewayRetriever<DispatcherGateway> mockGatewayRetriever;
- private RestServerEndpointConfiguration restServerEndpointConfiguration;
-
private volatile FailHttpRequestPredicate failHttpRequest =
FailHttpRequestPredicate.never();
private ExecutorService executor;
@@ -177,8 +173,6 @@ public class RestClusterClientTest extends TestLogger {
@Before
public void setUp() throws Exception {
- restServerEndpointConfiguration =
- RestServerEndpointConfiguration.fromConfiguration(restConfig);
mockGatewayRetriever = () ->
CompletableFuture.completedFuture(mockRestfulGateway);
executor =
@@ -209,7 +203,7 @@ public class RestClusterClientTest extends TestLogger {
@Nonnull
private RestClient createRestClient() throws ConfigurationException {
- return new
RestClient(RestClientConfiguration.fromConfiguration(restConfig), executor) {
+ return new RestClient(restConfig, executor) {
@Override
public <
M extends MessageHeaders<R, P, U>,
@@ -1048,8 +1042,7 @@ public class RestClusterClientTest extends TestLogger {
private TestRestServerEndpoint createRestServerEndpoint(
final AbstractRestHandler<?, ?, ?, ?>... abstractRestHandlers)
throws Exception {
- TestRestServerEndpoint.Builder builder =
-
TestRestServerEndpoint.builder(restServerEndpointConfiguration);
+ TestRestServerEndpoint.Builder builder =
TestRestServerEndpoint.builder(restConfig);
Arrays.stream(abstractRestHandlers).forEach(builder::withHandler);
return builder.buildAndStart();
diff --git
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index b16f1eb..e865eed 100644
---
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -31,6 +31,7 @@ import
org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.util.ConfigurationException;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.SerializableString;
@@ -99,7 +100,7 @@ public class RestAPIDocGenerator {
* @param args args[0] contains the directory into which the generated
files are placed
* @throws IOException if any file operation failed
*/
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) throws IOException,
ConfigurationException {
String outputDirectory = args[0];
for (final RestAPIVersion apiVersion : RestAPIVersion.values()) {
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
index 27c11fd..429e4f5 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
@@ -21,7 +21,6 @@ package org.apache.flink.tests.util.flink;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
@@ -133,9 +132,7 @@ public class LocalStandaloneFlinkResource implements
FlinkResource {
distribution.startFlinkCluster();
try (final RestClient restClient =
- new RestClient(
- RestClientConfiguration.fromConfiguration(new
Configuration()),
- Executors.directExecutor())) {
+ new RestClient(new Configuration(),
Executors.directExecutor())) {
for (int retryAttempt = 0; retryAttempt < 30; retryAttempt++) {
final CompletableFuture<TaskManagersInfo> localhost =
restClient.sendRequest(
diff --git
a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
index 7955110..cf42948 100644
---
a/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
+++
b/flink-end-to-end-tests/flink-metrics-availability-test/src/test/java/org/apache/flink/metrics/tests/MetricsAvailabilityITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import
org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
@@ -94,9 +93,7 @@ public class MetricsAvailabilityITCase extends TestLogger {
public void testReporter() throws Exception {
try (ClusterController ignored = dist.startCluster(1)) {
final RestClient restClient =
- new RestClient(
- RestClientConfiguration.fromConfiguration(new
Configuration()),
- scheduledExecutorService);
+ new RestClient(new Configuration(),
scheduledExecutorService);
checkJobManagerMetricAvailability(restClient);
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
index d6fc7e4..543a48b 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
@@ -18,18 +18,22 @@
package org.apache.flink.runtime.webmonitor.utils;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory;
import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.handler.router.RouterHandler;
import org.apache.flink.runtime.webmonitor.HttpRequestHandler;
import org.apache.flink.runtime.webmonitor.PipelineErrorHandler;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
@@ -45,7 +49,14 @@ import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceLoader;
/** This classes encapsulates the boot-strapping of netty for the
web-frontend. */
public class WebFrontendBootstrap {
@@ -55,6 +66,8 @@ public class WebFrontendBootstrap {
private final ServerBootstrap bootstrap;
private final Channel serverChannel;
private final String restAddress;
+ private final Map<String, String> responseHeaders;
+ @VisibleForTesting List<InboundChannelHandlerFactory>
inboundChannelHandlerFactories;
public WebFrontendBootstrap(
Router router,
@@ -69,15 +82,34 @@ public class WebFrontendBootstrap {
this.router = Preconditions.checkNotNull(router);
this.log = Preconditions.checkNotNull(log);
this.uploadDir = directory;
+ this.responseHeaders = new HashMap<>();
+ inboundChannelHandlerFactories = new ArrayList<>();
+ ServiceLoader<InboundChannelHandlerFactory> loader =
+ ServiceLoader.load(InboundChannelHandlerFactory.class);
+ final Iterator<InboundChannelHandlerFactory> factories =
loader.iterator();
+ while (factories.hasNext()) {
+ try {
+ final InboundChannelHandlerFactory factory = factories.next();
+ if (factory != null) {
+ inboundChannelHandlerFactories.add(factory);
+ log.info("Loaded channel inbound factory: {}", factory);
+ }
+ } catch (Throwable e) {
+ log.error("Could not load channel inbound factory.", e);
+ throw e;
+ }
+ }
+ inboundChannelHandlerFactories.sort(
+
Comparator.comparingInt(InboundChannelHandlerFactory::priority).reversed());
ChannelInitializer<SocketChannel> initializer =
new ChannelInitializer<SocketChannel>() {
@Override
- protected void initChannel(SocketChannel ch) {
+ protected void initChannel(SocketChannel ch) throws
ConfigurationException {
RouterHandler handler =
new RouterHandler(
- WebFrontendBootstrap.this.router, new
HashMap<>());
+ WebFrontendBootstrap.this.router,
responseHeaders);
// SSL should be the first handler in the pipeline
if (serverSSLFactory != null) {
@@ -87,8 +119,18 @@ public class WebFrontendBootstrap {
serverSSLFactory.createNettySSLHandler(ch.alloc()));
}
+ ch.pipeline().addLast(new HttpServerCodec());
+
+ for (InboundChannelHandlerFactory factory :
+ inboundChannelHandlerFactories) {
+ Optional<ChannelHandler> channelHandler =
+ factory.createHandler(config,
responseHeaders);
+ if (channelHandler.isPresent()) {
+ ch.pipeline().addLast(channelHandler.get());
+ }
+ }
+
ch.pipeline()
- .addLast(new HttpServerCodec())
.addLast(new ChunkedWriteHandler())
.addLast(new HttpRequestHandler(uploadDir))
.addLast(handler.getName(), handler)
diff --git
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
index e9eee95..3151f94 100644
---
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
+++
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.TestLogger;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -70,7 +71,7 @@ public final class RestAPIStabilityTest extends TestLogger {
}
@Test
- public void testDispatcherRestAPIStability() throws IOException {
+ public void testDispatcherRestAPIStability() throws IOException,
ConfigurationException {
final String versionedSnapshotFileName =
String.format(SNAPSHOT_RESOURCE_PATTERN,
apiVersion.getURLVersionPrefix());
diff --git
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index e31e103..54108e0 100644
---
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -424,7 +424,7 @@ public class HistoryServerTest extends TestLogger {
env.execute();
}
- static Tuple2<Integer, String> getFromHTTP(String url) throws Exception {
+ public static Tuple2<Integer, String> getFromHTTP(String url) throws
Exception {
URL u = new URL(url);
HttpURLConnection connection = (HttpURLConnection) u.openConnection();
connection.setConnectTimeout(100000);
diff --git
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrapTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrapTest.java
new file mode 100644
index 0000000..f58093f
--- /dev/null
+++
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrapTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.utils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.runtime.io.network.netty.Prio0InboundChannelHandlerFactory;
+import
org.apache.flink.runtime.io.network.netty.Prio1InboundChannelHandlerFactory;
+import org.apache.flink.runtime.rest.handler.router.Router;
+import
org.apache.flink.runtime.webmonitor.history.HistoryServerStaticFileServerHandler;
+import org.apache.flink.runtime.webmonitor.history.HistoryServerTest;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for the WebFrontendBootstrap. */
+public class WebFrontendBootstrapTest {
+
+ @Rule public TemporaryFolder tmp = new TemporaryFolder();
+
+ @Test
+ public void testHandlersMustBeLoaded() throws Exception {
+ File webDir = tmp.newFolder("webDir");
+ Configuration configuration = new Configuration();
+ configuration.setString(
+ Prio0InboundChannelHandlerFactory.REDIRECT_FROM_URL,
"/nonExisting");
+
configuration.setString(Prio0InboundChannelHandlerFactory.REDIRECT_TO_URL,
"/index.html");
+ Router router =
+ new Router().addGet("/:*", new
HistoryServerStaticFileServerHandler(webDir));
+ WebFrontendBootstrap webUI =
+ new WebFrontendBootstrap(
+ router,
+
LoggerFactory.getLogger(WebFrontendBootstrapTest.class),
+ tmp.newFolder("uploadDir"),
+ null,
+ "localhost",
+ 0,
+ configuration);
+
+ assertEquals(webUI.inboundChannelHandlerFactories.size(), 2);
+ assertTrue(
+ webUI.inboundChannelHandlerFactories.get(0)
+ instanceof Prio1InboundChannelHandlerFactory);
+ assertTrue(
+ webUI.inboundChannelHandlerFactories.get(1)
+ instanceof Prio0InboundChannelHandlerFactory);
+
+ int port = webUI.getServerPort();
+ try {
+ Tuple2<Integer, String> index =
+ HistoryServerTest.getFromHTTP("http://localhost:" + port +
"/index.html");
+ Assert.assertEquals(index.f0.intValue(), 200);
+ Assert.assertTrue(index.f1.contains("Apache Flink Web Dashboard"));
+
+ Tuple2<Integer, String> index2 =
+ HistoryServerTest.getFromHTTP("http://localhost:" + port +
"/nonExisting");
+ Assert.assertEquals(index2.f0.intValue(), 200);
+ Assert.assertEquals(index, index2);
+ } finally {
+ webUI.shutdown();
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 29483bf..e241437 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
@@ -35,6 +34,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.runtime.webmonitor.WebMonitorExtension;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
@@ -53,7 +53,6 @@ public class DispatcherRestEndpoint extends
WebMonitorEndpoint<DispatcherGateway
private WebMonitorExtension webSubmissionExtension;
public DispatcherRestEndpoint(
- RestServerEndpointConfiguration endpointConfiguration,
GatewayRetriever<DispatcherGateway> leaderRetriever,
Configuration clusterConfiguration,
RestHandlerConfiguration restConfiguration,
@@ -64,10 +63,9 @@ public class DispatcherRestEndpoint extends
WebMonitorEndpoint<DispatcherGateway
LeaderElectionService leaderElectionService,
ExecutionGraphCache executionGraphCache,
FatalErrorHandler fatalErrorHandler)
- throws IOException {
+ throws IOException, ConfigurationException {
super(
- endpointConfiguration,
leaderRetriever,
clusterConfiguration,
restConfiguration,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundChannelHandlerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundChannelHandlerFactory.java
new file mode 100644
index 0000000..4316224
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundChannelHandlerFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.ConfigurationException;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Custom netty inbound handler factory in order to make custom changes on
netty inbound data. Good
+ * example usage of this API is custom authentication. When the user is not
authenticated then the
+ * instantiated channel handler can send back 401 to trigger negotiation.
Since implementations are
+ * loaded with service loader it's discouraged to store any internal state in
factories.
+ */
+@Experimental
+public interface InboundChannelHandlerFactory {
+ /**
+ * Gives back priority of the {@link ChannelHandler}. The bigger the value
is, the earlier it is
+ * executed. If multiple handlers have the same priority then the order is
not defined.
+ *
+ * @return the priority of the {@link ChannelHandler}.
+ */
+ int priority();
+
+ /**
+ * Creates new instance of {@link ChannelHandler}
+ *
+ * @param configuration The Flink {@link Configuration}.
+ * @param responseHeaders The response headers.
+ * @return {@link ChannelHandler} or null if no custom handler need to be
created.
+ * @throws ConfigurationException Thrown, if the handler configuration is
incorrect.
+ */
+ Optional<ChannelHandler> createHandler(
+ Configuration configuration, Map<String, String> responseHeaders)
+ throws ConfigurationException;
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundChannelHandlerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundChannelHandlerFactory.java
new file mode 100644
index 0000000..564394d
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundChannelHandlerFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.ConfigurationException;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import java.util.Optional;
+
+/**
+ * Custom netty outbound handler factory in order to make custom changes on
netty outbound data.
+ * Good example usage of this API is custom authentication. When the user
wants to send
+ * authentication information then the instantiated channel handler can modify
the HTTP request.
+ * Since implementations are loaded with service loader it's discouraged to
store any internal state
+ * in factories.
+ */
+@Experimental
+public interface OutboundChannelHandlerFactory {
+ /**
+ * Gives back priority of the {@link ChannelHandler}. The bigger the value
is, the earlier it is
+ * executed. If multiple handlers have the same priority then the order is
not defined.
+ *
+ * @return the priority of the {@link ChannelHandler}.
+ */
+ int priority();
+
+ /**
+ * Creates new instance of {@link ChannelHandler}
+ *
+ * @param configuration The Flink {@link Configuration}.
+ * @return {@link ChannelHandler} or null if no custom handler needs to be
created.
+ * @throws ConfigurationException Thrown, if the handler configuration is
incorrect.
+ */
+ Optional<ChannelHandler> createHandler(Configuration configuration)
+ throws ConfigurationException;
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
index 8482e49..5b0e494 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniDispatcherRestEndpoint.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
@@ -39,7 +39,6 @@ import java.util.concurrent.ScheduledExecutorService;
public class MiniDispatcherRestEndpoint extends
WebMonitorEndpoint<RestfulGateway> {
public MiniDispatcherRestEndpoint(
- RestServerEndpointConfiguration endpointConfiguration,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Configuration clusterConfiguration,
RestHandlerConfiguration restConfiguration,
@@ -50,9 +49,8 @@ public class MiniDispatcherRestEndpoint extends
WebMonitorEndpoint<RestfulGatewa
LeaderElectionService leaderElectionService,
ExecutionGraphCache executionGraphCache,
FatalErrorHandler fatalErrorHandler)
- throws IOException {
+ throws IOException, ConfigurationException {
super(
- endpointConfiguration,
leaderRetriever,
clusterConfiguration,
restConfiguration,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
index f2db42f..9ce1d7f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
@@ -52,7 +52,6 @@ public enum JobRestEndpointFactory implements
RestEndpointFactory<RestfulGateway
RestHandlerConfiguration.fromConfiguration(configuration);
return new MiniDispatcherRestEndpoint(
-
RestServerEndpointConfiguration.fromConfiguration(configuration),
dispatcherGatewayRetriever,
configuration,
restHandlerConfiguration,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index cf75a95..4cb1eed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -18,9 +18,12 @@
package org.apache.flink.runtime.rest;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.io.network.netty.OutboundChannelHandlerFactory;
import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -34,6 +37,7 @@ import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
@@ -51,6 +55,7 @@ import
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
@@ -86,8 +91,14 @@ import java.io.InputStream;
import java.io.StringWriter;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -111,12 +122,35 @@ public class RestClient implements AutoCloseableAsync {
private final AtomicBoolean isRunning = new AtomicBoolean(true);
- public RestClient(RestClientConfiguration configuration, Executor
executor) {
+ @VisibleForTesting List<OutboundChannelHandlerFactory>
outboundChannelHandlerFactories;
+
+ public RestClient(Configuration configuration, Executor executor)
+ throws ConfigurationException {
Preconditions.checkNotNull(configuration);
this.executor = Preconditions.checkNotNull(executor);
this.terminationFuture = new CompletableFuture<>();
+ outboundChannelHandlerFactories = new ArrayList<>();
+ ServiceLoader<OutboundChannelHandlerFactory> loader =
+ ServiceLoader.load(OutboundChannelHandlerFactory.class);
+ final Iterator<OutboundChannelHandlerFactory> factories =
loader.iterator();
+ while (factories.hasNext()) {
+ try {
+ final OutboundChannelHandlerFactory factory = factories.next();
+ if (factory != null) {
+ outboundChannelHandlerFactories.add(factory);
+ LOG.info("Loaded channel outbound factory: {}", factory);
+ }
+ } catch (Throwable e) {
+ LOG.error("Could not load channel outbound factory.", e);
+ throw e;
+ }
+ }
+ outboundChannelHandlerFactories.sort(
+
Comparator.comparingInt(OutboundChannelHandlerFactory::priority).reversed());
- final SSLHandlerFactory sslHandlerFactory =
configuration.getSslHandlerFactory();
+ final RestClientConfiguration restConfiguration =
+ RestClientConfiguration.fromConfiguration(configuration);
+ final SSLHandlerFactory sslHandlerFactory =
restConfiguration.getSslHandlerFactory();
ChannelInitializer<SocketChannel> initializer =
new ChannelInitializer<SocketChannel>() {
@Override
@@ -137,14 +171,26 @@ public class RestClient implements AutoCloseableAsync {
.addLast(new HttpClientCodec())
.addLast(
new HttpObjectAggregator(
-
configuration.getMaxContentLength()))
+
restConfiguration.getMaxContentLength()));
+
+ for (OutboundChannelHandlerFactory factory :
+ outboundChannelHandlerFactories) {
+ Optional<ChannelHandler> channelHandler =
+ factory.createHandler(configuration);
+ if (channelHandler.isPresent()) {
+
socketChannel.pipeline().addLast(channelHandler.get());
+ }
+ }
+
+ socketChannel
+ .pipeline()
.addLast(new ChunkedWriteHandler()) //
required for
// multipart-requests
.addLast(
new IdleStateHandler(
-
configuration.getIdlenessTimeout(),
-
configuration.getIdlenessTimeout(),
-
configuration.getIdlenessTimeout(),
+
restConfiguration.getIdlenessTimeout(),
+
restConfiguration.getIdlenessTimeout(),
+
restConfiguration.getIdlenessTimeout(),
TimeUnit.MILLISECONDS))
.addLast(new ClientHandler());
} catch (Throwable t) {
@@ -160,7 +206,7 @@ public class RestClient implements AutoCloseableAsync {
bootstrap
.option(
ChannelOption.CONNECT_TIMEOUT_MILLIS,
- Math.toIntExact(configuration.getConnectionTimeout()))
+
Math.toIntExact(restConfiguration.getConnectionTimeout()))
.group(group)
.channel(NioSocketChannel.class)
.handler(initializer);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 5ee0894..f11450e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.rest;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory;
import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
import org.apache.flink.runtime.net.RedirectingSslHandler;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
@@ -30,6 +32,7 @@ import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.handler.router.RouterHandler;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
@@ -40,6 +43,7 @@ import
org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrapConfig;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
@@ -61,6 +65,7 @@ import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -69,6 +74,8 @@ import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -81,6 +88,7 @@ public abstract class RestServerEndpoint implements
AutoCloseableAsync {
private final Object lock = new Object();
+ private final Configuration configuration;
private final String restAddress;
private final String restBindAddress;
private final String restBindPortRange;
@@ -99,21 +107,47 @@ public abstract class RestServerEndpoint implements
AutoCloseableAsync {
private State state = State.CREATED;
- public RestServerEndpoint(RestServerEndpointConfiguration configuration)
throws IOException {
+ @VisibleForTesting List<InboundChannelHandlerFactory>
inboundChannelHandlerFactories;
+
+ public RestServerEndpoint(Configuration configuration)
+ throws IOException, ConfigurationException {
Preconditions.checkNotNull(configuration);
+ RestServerEndpointConfiguration restConfiguration =
+
RestServerEndpointConfiguration.fromConfiguration(configuration);
+ Preconditions.checkNotNull(restConfiguration);
- this.restAddress = configuration.getRestAddress();
- this.restBindAddress = configuration.getRestBindAddress();
- this.restBindPortRange = configuration.getRestBindPortRange();
- this.sslHandlerFactory = configuration.getSslHandlerFactory();
+ this.configuration = configuration;
+ this.restAddress = restConfiguration.getRestAddress();
+ this.restBindAddress = restConfiguration.getRestBindAddress();
+ this.restBindPortRange = restConfiguration.getRestBindPortRange();
+ this.sslHandlerFactory = restConfiguration.getSslHandlerFactory();
- this.uploadDir = configuration.getUploadDir();
+ this.uploadDir = restConfiguration.getUploadDir();
createUploadDir(uploadDir, log, true);
- this.maxContentLength = configuration.getMaxContentLength();
- this.responseHeaders = configuration.getResponseHeaders();
+ this.maxContentLength = restConfiguration.getMaxContentLength();
+ this.responseHeaders = restConfiguration.getResponseHeaders();
terminationFuture = new CompletableFuture<>();
+
+ inboundChannelHandlerFactories = new ArrayList<>();
+ ServiceLoader<InboundChannelHandlerFactory> loader =
+ ServiceLoader.load(InboundChannelHandlerFactory.class);
+ final Iterator<InboundChannelHandlerFactory> factories =
loader.iterator();
+ while (factories.hasNext()) {
+ try {
+ final InboundChannelHandlerFactory factory = factories.next();
+ if (factory != null) {
+ inboundChannelHandlerFactories.add(factory);
+ log.info("Loaded channel inbound factory: {}", factory);
+ }
+ } catch (Throwable e) {
+ log.error("Could not load channel inbound factory.", e);
+ throw e;
+ }
+ }
+ inboundChannelHandlerFactories.sort(
+
Comparator.comparingInt(InboundChannelHandlerFactory::priority).reversed());
}
/**
@@ -159,7 +193,7 @@ public abstract class RestServerEndpoint implements
AutoCloseableAsync {
new ChannelInitializer<SocketChannel>() {
@Override
- protected void initChannel(SocketChannel ch) {
+ protected void initChannel(SocketChannel ch) throws
ConfigurationException {
RouterHandler handler = new RouterHandler(router,
responseHeaders);
// SSL should be the first handler in the pipeline
@@ -173,8 +207,18 @@ public abstract class RestServerEndpoint implements
AutoCloseableAsync {
sslHandlerFactory));
}
+ ch.pipeline().addLast(new HttpServerCodec());
+
+ for (InboundChannelHandlerFactory factory :
+ inboundChannelHandlerFactories) {
+ Optional<ChannelHandler> channelHandler =
+ factory.createHandler(configuration,
responseHeaders);
+ if (channelHandler.isPresent()) {
+
ch.pipeline().addLast(channelHandler.get());
+ }
+ }
+
ch.pipeline()
- .addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir))
.addLast(
new FlinkHttpObjectAggregator(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
index 9a39a2c..4b5b553 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
@@ -51,7 +51,6 @@ public enum SessionRestEndpointFactory implements
RestEndpointFactory<Dispatcher
RestHandlerConfiguration.fromConfiguration(configuration);
return new DispatcherRestEndpoint(
-
RestServerEndpointConfiguration.fromConfiguration(configuration),
dispatcherGatewayRetriever,
configuration,
restHandlerConfiguration,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
index d4d7a6a..94da100 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
@@ -123,7 +123,7 @@ public class RouterHandler extends
SimpleChannelInboundHandler<HttpRequest> {
HandlerUtils.sendErrorResponse(
channelHandlerContext,
request,
- new ErrorResponseBody("Not found."),
+ new ErrorResponseBody("Not found: " + request.uri()),
HttpResponseStatus.NOT_FOUND,
responseHeaders);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 5071e4e..9ce6aeb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -29,7 +29,6 @@ import
org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.RestServerEndpoint;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
@@ -139,6 +138,7 @@ import
org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoStats;
import
org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTracker;
import
org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTrackerBuilder;
import
org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoRequestCoordinator;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FileUtils;
@@ -197,7 +197,6 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
@Nullable private ScheduledFuture<?> executionGraphCleanupTask;
public WebMonitorEndpoint(
- RestServerEndpointConfiguration endpointConfiguration,
GatewayRetriever<? extends T> leaderRetriever,
Configuration clusterConfiguration,
RestHandlerConfiguration restConfiguration,
@@ -208,8 +207,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
LeaderElectionService leaderElectionService,
ExecutionGraphCache executionGraphCache,
FatalErrorHandler fatalErrorHandler)
- throws IOException {
- super(endpointConfiguration);
+ throws IOException, ConfigurationException {
+ super(clusterConfiguration);
this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
this.clusterConfiguration =
Preconditions.checkNotNull(clusterConfiguration);
this.restConfiguration = Preconditions.checkNotNull(restConfiguration);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/Prio0InboundChannelHandlerFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/Prio0InboundChannelHandlerFactory.java
new file mode 100644
index 0000000..97fe486
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/Prio0InboundChannelHandlerFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.ConfigurationException;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Test inbound channel handler factory. */
+public class Prio0InboundChannelHandlerFactory implements
InboundChannelHandlerFactory {
+ public static final ConfigOption<String> REDIRECT_FROM_URL =
+
ConfigOptions.key("test.in.redirect.from.url").stringType().defaultValue("");
+ public static final ConfigOption<String> REDIRECT_TO_URL =
+
ConfigOptions.key("test.in.redirect.to.url").stringType().defaultValue("");
+
+ @Override
+ public int priority() {
+ return 0;
+ }
+
+ @Override
+ public Optional<ChannelHandler> createHandler(
+ Configuration configuration, Map<String, String> responseHeaders)
+ throws ConfigurationException {
+ String redirectFromUrl = configuration.getString(REDIRECT_FROM_URL);
+ String redirectToUrl = configuration.getString(REDIRECT_TO_URL);
+ if (!redirectFromUrl.isEmpty() && !redirectToUrl.isEmpty()) {
+ return Optional.of(
+ new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelRead(ChannelHandlerContext ctx,
Object msg) {
+ if (msg instanceof HttpRequest) {
+ HttpRequest httpRequest = (HttpRequest) msg;
+ if (httpRequest.uri().equals(redirectFromUrl))
{
+ httpRequest.setUri(redirectToUrl);
+ }
+ }
+
ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
+ }
+ });
+ }
+ return Optional.empty();
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/Prio1InboundChannelHandlerFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/Prio1InboundChannelHandlerFactory.java
new file mode 100644
index 0000000..6f698bd
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/Prio1InboundChannelHandlerFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.ConfigurationException;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Test inbound channel handler factory. */
+public class Prio1InboundChannelHandlerFactory implements
InboundChannelHandlerFactory {
+ @Override
+ public int priority() {
+ return 1;
+ }
+
+ @Override
+ public Optional<ChannelHandler> createHandler(
+ Configuration configuration, Map<String, String> responseHeaders)
+ throws ConfigurationException {
+ return Optional.empty();
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 0d4394b..9e5e4d7 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -105,9 +105,6 @@ public class MultipartUploadResource extends
ExternalResource {
configuredUploadDir = temporaryFolder.newFolder().toPath();
config.setString(WebOptions.UPLOAD_DIR,
configuredUploadDir.toString());
- RestServerEndpointConfiguration serverConfig =
- RestServerEndpointConfiguration.fromConfiguration(config);
-
RestfulGateway mockRestfulGateway = mock(RestfulGateway.class);
final GatewayRetriever<RestfulGateway> mockGatewayRetriever =
@@ -125,7 +122,7 @@ public class MultipartUploadResource extends
ExternalResource {
fileHandler = new MultipartFileHandler(mockGatewayRetriever);
serverEndpoint =
- TestRestServerEndpoint.builder(serverConfig)
+ TestRestServerEndpoint.builder(config)
.withHandler(mixedHandler)
.withHandler(jsonHandler)
.withHandler(fileHandler)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/Prio0OutboundChannelHandlerFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/Prio0OutboundChannelHandlerFactory.java
new file mode 100644
index 0000000..398cd6c
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/Prio0OutboundChannelHandlerFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.netty.OutboundChannelHandlerFactory;
+import org.apache.flink.util.ConfigurationException;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelDuplexHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+
+import java.util.Optional;
+
+/** Test outbound channel handler factory. */
+public class Prio0OutboundChannelHandlerFactory implements
OutboundChannelHandlerFactory {
+ public static final ConfigOption<String> REDIRECT_TO_URL =
+
ConfigOptions.key("test.out.redirect.to.url").stringType().defaultValue("");
+
+ @Override
+ public int priority() {
+ return 0;
+ }
+
+ @Override
+ public Optional<ChannelHandler> createHandler(Configuration configuration)
+ throws ConfigurationException {
+ String redirectToUrl = configuration.getString(REDIRECT_TO_URL);
+ if (!redirectToUrl.isEmpty()) {
+ return Optional.of(
+ new ChannelDuplexHandler() {
+ @Override
+ public void write(
+ ChannelHandlerContext ctx, Object msg,
ChannelPromise promise)
+ throws Exception {
+ if (msg instanceof HttpRequest) {
+ HttpRequest httpRequest = (HttpRequest) msg;
+ httpRequest.setUri(redirectToUrl);
+ }
+ super.write(ctx, msg, promise);
+ }
+ });
+ }
+ return Optional.empty();
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/Prio1OutboundChannelHandlerFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/Prio1OutboundChannelHandlerFactory.java
new file mode 100644
index 0000000..f44ea84
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/Prio1OutboundChannelHandlerFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.netty.OutboundChannelHandlerFactory;
+import org.apache.flink.util.ConfigurationException;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import java.util.Optional;
+
+/** Test outbound channel handler factory. */
+public class Prio1OutboundChannelHandlerFactory implements
OutboundChannelHandlerFactory {
+ @Override
+ public int priority() {
+ return 1;
+ }
+
+ @Override
+ public Optional<ChannelHandler> createHandler(Configuration configuration)
+ throws ConfigurationException {
+ return Optional.empty();
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java
index 2ed7275..97b5992 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientMultipartTest.java
@@ -50,10 +50,7 @@ public class RestClientMultipartTest extends TestLogger {
@BeforeClass
public static void setupClient() throws ConfigurationException {
- restClient =
- new RestClient(
- RestClientConfiguration.fromConfiguration(new
Configuration()),
- TestingUtils.defaultExecutor());
+ restClient = new RestClient(new Configuration(),
TestingUtils.defaultExecutor());
}
@After
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index a77961b..3b74857 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -61,10 +61,7 @@ public class RestClientTest extends TestLogger {
public void testConnectionTimeout() throws Exception {
final Configuration config = new Configuration();
config.setLong(RestOptions.CONNECTION_TIMEOUT, 1);
- try (final RestClient restClient =
- new RestClient(
- RestClientConfiguration.fromConfiguration(config),
- Executors.directExecutor())) {
+ try (final RestClient restClient = new RestClient(config,
Executors.directExecutor())) {
restClient
.sendRequest(
unroutableIp,
@@ -83,9 +80,7 @@ public class RestClientTest extends TestLogger {
@Test
public void testInvalidVersionRejection() throws Exception {
try (final RestClient restClient =
- new RestClient(
- RestClientConfiguration.fromConfiguration(new
Configuration()),
- Executors.directExecutor())) {
+ new RestClient(new Configuration(),
Executors.directExecutor())) {
CompletableFuture<EmptyResponseBody> invalidVersionResponse =
restClient.sendRequest(
unroutableIp,
@@ -108,9 +103,7 @@ public class RestClientTest extends TestLogger {
config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
try (final ServerSocket serverSocket = new ServerSocket(0);
final RestClient restClient =
- new RestClient(
-
RestClientConfiguration.fromConfiguration(config),
- TestingUtils.defaultExecutor())) {
+ new RestClient(config,
TestingUtils.defaultExecutor())) {
final String targetAddress = "localhost";
final int targetPort = serverSocket.getLocalPort();
@@ -162,9 +155,7 @@ public class RestClientTest extends TestLogger {
try (final ServerSocket serverSocket = new ServerSocket(0);
final RestClient restClient =
- new RestClient(
-
RestClientConfiguration.fromConfiguration(config),
- TestingUtils.defaultExecutor())) {
+ new RestClient(config,
TestingUtils.defaultExecutor())) {
final String targetAddress = "localhost";
final int targetPort = serverSocket.getLocalPort();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestExternalHandlersITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestExternalHandlersITCase.java
new file mode 100644
index 0000000..bc8dd31
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestExternalHandlersITCase.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import
org.apache.flink.runtime.io.network.netty.Prio0InboundChannelHandlerFactory;
+import
org.apache.flink.runtime.io.network.netty.Prio1InboundChannelHandlerFactory;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.TestRestServerEndpoint;
+import org.apache.flink.runtime.testutils.TestingUtils;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.TestLogger;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** IT cases for {@link RestClient} and {@link RestServerEndpoint}. */
+public class RestExternalHandlersITCase extends TestLogger {
+
+ private static final Time timeout = Time.seconds(10L);
+ private static final String REQUEST_URL = "/nonExisting1";
+ private static final String REDIRECT1_URL = "/nonExisting2";
+ private static final String REDIRECT2_URL = "/nonExisting3";
+
+ private RestServerEndpoint serverEndpoint;
+ private RestClient restClient;
+ private InetSocketAddress serverAddress;
+
+ private final Configuration config;
+
+ public RestExternalHandlersITCase() {
+ this.config = getBaseConfig();
+ }
+
+ private static Configuration getBaseConfig() {
+ final String loopbackAddress =
InetAddress.getLoopbackAddress().getHostAddress();
+
+ final Configuration config = new Configuration();
+ config.setString(RestOptions.BIND_PORT, "0");
+ config.setString(RestOptions.BIND_ADDRESS, loopbackAddress);
+ config.setString(RestOptions.ADDRESS, loopbackAddress);
+ config.setString(Prio0OutboundChannelHandlerFactory.REDIRECT_TO_URL,
REDIRECT1_URL);
+ config.setString(Prio0InboundChannelHandlerFactory.REDIRECT_FROM_URL,
REDIRECT1_URL);
+ config.setString(Prio0InboundChannelHandlerFactory.REDIRECT_TO_URL,
REDIRECT2_URL);
+ return config;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ serverEndpoint =
TestRestServerEndpoint.builder(config).buildAndStart();
+ restClient = new TestRestClient(config);
+ serverAddress = serverEndpoint.getServerAddress();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (restClient != null) {
+ restClient.shutdown(timeout);
+ restClient = null;
+ }
+
+ if (serverEndpoint != null) {
+ serverEndpoint.closeAsync().get(timeout.getSize(),
timeout.getUnit());
+ serverEndpoint = null;
+ }
+ }
+
+ @Test
+ public void testHandlersMustBeLoaded() throws Exception {
+ assertEquals(serverEndpoint.inboundChannelHandlerFactories.size(), 2);
+ assertTrue(
+ serverEndpoint.inboundChannelHandlerFactories.get(0)
+ instanceof Prio1InboundChannelHandlerFactory);
+ assertTrue(
+ serverEndpoint.inboundChannelHandlerFactories.get(1)
+ instanceof Prio0InboundChannelHandlerFactory);
+
+ assertEquals(restClient.outboundChannelHandlerFactories.size(), 2);
+ assertTrue(
+ restClient.outboundChannelHandlerFactories.get(0)
+ instanceof Prio1OutboundChannelHandlerFactory);
+ assertTrue(
+ restClient.outboundChannelHandlerFactories.get(1)
+ instanceof Prio0OutboundChannelHandlerFactory);
+
+ try {
+ final CompletableFuture<TestResponse> response =
+ sendRequestToTestHandler(new TestRequest());
+ response.get();
+ fail("Request must fail with 2 times redirected URL");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains(REDIRECT2_URL));
+ }
+ }
+
+ private CompletableFuture<TestResponse> sendRequestToTestHandler(
+ final TestRequest testRequest) {
+ try {
+ return restClient.sendRequest(
+ serverAddress.getHostName(),
+ serverAddress.getPort(),
+ new TestHeaders(),
+ EmptyMessageParameters.getInstance(),
+ testRequest);
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static class TestRestClient extends RestClient {
+
+ TestRestClient(Configuration configuration) throws
ConfigurationException {
+ super(configuration, TestingUtils.defaultExecutor());
+ }
+ }
+
+ private static class TestRequest implements RequestBody {}
+
+ private static class TestResponse implements ResponseBody {}
+
+ private static class TestHeaders
+ implements MessageHeaders<TestRequest, TestResponse,
EmptyMessageParameters> {
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.POST;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return REQUEST_URL;
+ }
+
+ @Override
+ public Class<TestRequest> getRequestClass() {
+ return TestRequest.class;
+ }
+
+ @Override
+ public Class<TestResponse> getResponseClass() {
+ return TestResponse.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public String getDescription() {
+ return "";
+ }
+
+ @Override
+ public EmptyMessageParameters getUnresolvedMessageParameters() {
+ return EmptyMessageParameters.getInstance();
+ }
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 383e2df..1bd181e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
@@ -195,10 +196,6 @@ public class RestServerEndpointITCase extends TestLogger {
HttpsURLConnection.setDefaultSSLSocketFactory(sslClientContext.getSocketFactory());
}
- RestServerEndpointConfiguration serverConfig =
- RestServerEndpointConfiguration.fromConfiguration(config);
- RestClientConfiguration clientConfig =
RestClientConfiguration.fromConfiguration(config);
-
RestfulGateway mockRestfulGateway = new
TestingRestfulGateway.Builder().build();
final GatewayRetriever<RestfulGateway> mockGatewayRetriever =
@@ -234,7 +231,7 @@ public class RestServerEndpointITCase extends TestLogger {
mockGatewayRetriever, RpcUtils.INF_TIMEOUT,
temporaryFolder.getRoot());
serverEndpoint =
- TestRestServerEndpoint.builder(serverConfig)
+ TestRestServerEndpoint.builder(config)
.withHandler(new TestHeaders(), testHandler)
.withHandler(TestUploadHeaders.INSTANCE,
testUploadHandler)
.withHandler(testVersionHandler)
@@ -244,7 +241,7 @@ public class RestServerEndpointITCase extends TestLogger {
WebContentHandlerSpecification.getInstance(),
staticFileServerHandler)
.buildAndStart();
- restClient = new TestRestClient(clientConfig);
+ restClient = new TestRestClient(config);
serverAddress = serverEndpoint.getServerAddress();
}
@@ -639,13 +636,9 @@ public class RestServerEndpointITCase extends TestLogger {
config.setString(RestOptions.ADDRESS, "localhost");
config.setString(RestOptions.BIND_PORT, portRangeStart + "-" +
portRangeEnd);
- final RestServerEndpointConfiguration serverConfig =
- RestServerEndpointConfiguration.fromConfiguration(config);
-
- try (RestServerEndpoint serverEndpoint1 =
- TestRestServerEndpoint.builder(serverConfig).build();
+ try (RestServerEndpoint serverEndpoint1 =
TestRestServerEndpoint.builder(config).build();
RestServerEndpoint serverEndpoint2 =
- TestRestServerEndpoint.builder(serverConfig).build()) {
+ TestRestServerEndpoint.builder(config).build()) {
serverEndpoint1.start();
serverEndpoint2.start();
@@ -672,15 +665,12 @@ public class RestServerEndpointITCase extends TestLogger {
@Test
public void testEndpointsMustBeUnique() throws Exception {
- final RestServerEndpointConfiguration serverConfig =
- RestServerEndpointConfiguration.fromConfiguration(config);
-
assertThrows(
"REST handler registration",
FlinkRuntimeException.class,
() -> {
try (TestRestServerEndpoint restServerEndpoint =
- TestRestServerEndpoint.builder(serverConfig)
+ TestRestServerEndpoint.builder(config)
.withHandler(new TestHeaders(),
testHandler)
.withHandler(new TestHeaders(),
testUploadHandler)
.build()) {
@@ -692,15 +682,12 @@ public class RestServerEndpointITCase extends TestLogger {
@Test
public void testDuplicateHandlerRegistrationIsForbidden() throws Exception
{
- final RestServerEndpointConfiguration serverConfig =
- RestServerEndpointConfiguration.fromConfiguration(config);
-
assertThrows(
"Duplicate REST handler",
FlinkRuntimeException.class,
() -> {
try (TestRestServerEndpoint restServerEndpoint =
- TestRestServerEndpoint.builder(serverConfig)
+ TestRestServerEndpoint.builder(config)
.withHandler(new TestHeaders(),
testHandler)
.withHandler(TestUploadHeaders.INSTANCE,
testHandler)
.build()) {
@@ -796,7 +783,7 @@ public class RestServerEndpointITCase extends TestLogger {
static class TestRestClient extends RestClient {
- TestRestClient(RestClientConfiguration configuration) {
+ TestRestClient(Configuration configuration) throws
ConfigurationException {
super(configuration, TestingUtils.defaultExecutor());
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
index c0274c9..950e338 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
@@ -104,11 +104,6 @@ public class RestServerSSLAuthITCase extends TestLogger {
RestServerEndpoint serverEndpoint = null;
try {
- RestServerEndpointConfiguration restServerConfig =
-
RestServerEndpointConfiguration.fromConfiguration(serverConfig);
- RestClientConfiguration restClientConfig =
- RestClientConfiguration.fromConfiguration(clientConfig);
-
RestfulGateway restfulGateway = new
TestingRestfulGateway.Builder().build();
RestServerEndpointITCase.TestVersionHandler testVersionHandler =
new RestServerEndpointITCase.TestVersionHandler(
@@ -116,10 +111,10 @@ public class RestServerSSLAuthITCase extends TestLogger {
RpcUtils.INF_TIMEOUT);
serverEndpoint =
- TestRestServerEndpoint.builder(restServerConfig)
+ TestRestServerEndpoint.builder(serverConfig)
.withHandler(testVersionHandler.getMessageHeaders(), testVersionHandler)
.buildAndStart();
- restClient = new
RestServerEndpointITCase.TestRestClient(restClientConfig);
+ restClient = new
RestServerEndpointITCase.TestRestClient(clientConfig);
CompletableFuture<EmptyResponseBody> response =
restClient.sendRequest(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerITCase.java
index af1ef74..6bb1e2d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerITCase.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.rest.handler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -77,8 +75,7 @@ public class AbstractHandlerITCase extends TestLogger {
Configuration config = new Configuration(REST_BASE_CONFIG);
config.setInteger(RestOptions.PORT, serverPort);
- return new RestClient(
- RestClientConfiguration.fromConfiguration(config),
Executors.directExecutor());
+ return new RestClient(config, Executors.directExecutor());
}
@Test
@@ -99,9 +96,7 @@ public class AbstractHandlerITCase extends TestLogger {
new OutOfMemoryError("Metaspace")));
try (final TestRestServerEndpoint server =
- TestRestServerEndpoint.builder(
-
RestServerEndpointConfiguration.fromConfiguration(
- REST_BASE_CONFIG))
+ TestRestServerEndpoint.builder(REST_BASE_CONFIG)
.withHandler(messageHeaders, testRestHandler)
.buildAndStart();
final RestClient restClient =
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
index 4f17cb1..b83f98a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
@@ -28,7 +28,6 @@ import
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
@@ -55,7 +54,6 @@ public class DocumentingDispatcherRestEndpoint extends
DispatcherRestEndpoint
implements DocumentingRestEndpoint {
private static final Configuration config;
- private static final RestServerEndpointConfiguration restConfig;
private static final RestHandlerConfiguration handlerConfig;
private static final GatewayRetriever<DispatcherGateway>
dispatcherGatewayRetriever;
private static final GatewayRetriever<ResourceManagerGateway>
resourceManagerGatewayRetriever;
@@ -65,22 +63,14 @@ public class DocumentingDispatcherRestEndpoint extends
DispatcherRestEndpoint
config.setString(RestOptions.ADDRESS, "localhost");
// necessary for loading the web-submission extension
config.setString(JobManagerOptions.ADDRESS, "localhost");
- try {
- restConfig =
RestServerEndpointConfiguration.fromConfiguration(config);
- } catch (ConfigurationException e) {
- throw new RuntimeException(
- "Implementation error.
RestServerEndpointConfiguration#fromConfiguration failed for default
configuration.",
- e);
- }
handlerConfig = RestHandlerConfiguration.fromConfiguration(config);
dispatcherGatewayRetriever = () -> null;
resourceManagerGatewayRetriever = () -> null;
}
- public DocumentingDispatcherRestEndpoint() throws IOException {
+ public DocumentingDispatcherRestEndpoint() throws IOException,
ConfigurationException {
super(
- restConfig,
dispatcherGatewayRetriever,
config,
handlerConfig,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/TestRestServerEndpoint.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/TestRestServerEndpoint.java
index e5443cf..c9b25fe 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/TestRestServerEndpoint.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/TestRestServerEndpoint.java
@@ -19,10 +19,11 @@
package org.apache.flink.runtime.rest.util;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rest.RestServerEndpoint;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
@@ -34,7 +35,7 @@ import java.util.concurrent.CompletableFuture;
/** Utility {@link RestServerEndpoint} for setting up a rest server with a
given set of handlers. */
public class TestRestServerEndpoint extends RestServerEndpoint {
- public static Builder builder(RestServerEndpointConfiguration
configuration) {
+ public static Builder builder(Configuration configuration) {
return new Builder(configuration);
}
@@ -43,11 +44,11 @@ public class TestRestServerEndpoint extends
RestServerEndpoint {
*/
public static class Builder {
- private final RestServerEndpointConfiguration configuration;
+ private final Configuration configuration;
private final List<Tuple2<RestHandlerSpecification,
ChannelInboundHandler>> handlers =
new ArrayList<>();
- private Builder(RestServerEndpointConfiguration configuration) {
+ private Builder(Configuration configuration) {
this.configuration = configuration;
}
@@ -62,7 +63,7 @@ public class TestRestServerEndpoint extends
RestServerEndpoint {
return this;
}
- public TestRestServerEndpoint build() throws IOException {
+ public TestRestServerEndpoint build() throws IOException,
ConfigurationException {
return new TestRestServerEndpoint(configuration, handlers);
}
@@ -77,9 +78,9 @@ public class TestRestServerEndpoint extends
RestServerEndpoint {
private final List<Tuple2<RestHandlerSpecification,
ChannelInboundHandler>> handlers;
private TestRestServerEndpoint(
- final RestServerEndpointConfiguration configuration,
+ final Configuration configuration,
final List<Tuple2<RestHandlerSpecification,
ChannelInboundHandler>> handlers)
- throws IOException {
+ throws IOException, ConfigurationException {
super(configuration);
this.handlers = handlers;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java
index 9406869..bf5dcf3 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpointTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.NoOpTransientBlobService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -56,7 +55,6 @@ public class WebMonitorEndpointTest extends TestLogger {
.build();
try (final WebMonitorEndpoint<RestfulGateway> webMonitorEndpoint =
new WebMonitorEndpoint<>(
-
RestServerEndpointConfiguration.fromConfiguration(configuration),
CompletableFuture::new,
configuration,
RestHandlerConfiguration.fromConfiguration(configuration),
diff --git
a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory
new file mode 100644
index 0000000..09b801c
--- /dev/null
+++
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.io.network.netty.InboundChannelHandlerFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.io.network.netty.Prio0InboundChannelHandlerFactory
+org.apache.flink.runtime.io.network.netty.Prio1InboundChannelHandlerFactory
diff --git
a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.io.network.netty.OutboundChannelHandlerFactory
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.io.network.netty.OutboundChannelHandlerFactory
new file mode 100644
index 0000000..ea7be93
--- /dev/null
+++
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.io.network.netty.OutboundChannelHandlerFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.rest.Prio0OutboundChannelHandlerFactory
+org.apache.flink.runtime.rest.Prio1OutboundChannelHandlerFactory
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 3537adc..e602ba2 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -48,7 +48,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
@@ -759,8 +758,7 @@ public class SavepointITCase extends TestLogger {
// ExecutionVertex
final RestClient restClient =
new RestClient(
- RestClientConfiguration.fromConfiguration(
- new UnmodifiableConfiguration(new
Configuration())),
+ new UnmodifiableConfiguration(new Configuration()),
TestingUtils.defaultExecutor());
final JobDetailsHeaders detailsHeaders =
JobDetailsHeaders.getInstance();
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
index b9c707f..5c0225d 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
@@ -35,7 +35,6 @@ import
org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
import
org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview.JobIdWithStatus;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import
org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
@@ -568,9 +567,7 @@ public class BatchFineGrainedRecoveryITCase extends
TestLogger {
private RestClient createRestClient() throws ConfigurationException {
return new RestClient(
- RestClientConfiguration.fromConfiguration(
- new UnmodifiableConfiguration(new
Configuration())),
- executorService);
+ new UnmodifiableConfiguration(new Configuration()),
executorService);
}
private List<InternalTaskInfo> getInternalTaskInfos() {
diff --git
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 88d34a0..d878661 100644
---
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
import
org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoEntry;
@@ -134,10 +133,7 @@ public class YARNSessionCapacitySchedulerITCase extends
YarnTestBase {
startYARNWithConfig(YARN_CONFIGURATION);
restClientExecutor = Executors.newSingleThreadExecutor();
- restClient =
- new RestClient(
- RestClientConfiguration.fromConfiguration(new
Configuration()),
- restClientExecutor);
+ restClient = new RestClient(new Configuration(), restClientExecutor);
}
@AfterClass
diff --git
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index d216b9e..641a0b5 100644
---
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -32,7 +32,6 @@ import
org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
@@ -126,9 +125,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
final ApplicationId clusterId =
clusterClient.getClusterId();
final RestClient restClient =
- new RestClient(
-
RestClientConfiguration.fromConfiguration(configuration),
- TestingUtils.defaultExecutor());
+ new RestClient(configuration,
TestingUtils.defaultExecutor());
try {
final ApplicationReport applicationReport =