[FLINK-7521] Add config option to set the content length limit of REST server and client
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d0abd28 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d0abd28 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d0abd28 Branch: refs/heads/release-1.5 Commit: 3d0abd2824619cbf82ea4e91f74d859391a09d58 Parents: 9551c6f Author: zjureel <[email protected]> Authored: Thu Sep 7 10:39:39 2017 +0800 Committer: Till Rohrmann <[email protected]> Committed: Tue Mar 13 19:00:32 2018 +0100 ---------------------------------------------------------------------- .../apache/flink/configuration/RestOptions.java | 15 ++++++++++++ .../apache/flink/runtime/rest/RestClient.java | 2 +- .../runtime/rest/RestClientConfiguration.java | 24 ++++++++++++++++++-- .../flink/runtime/rest/RestServerEndpoint.java | 5 +++- .../rest/RestServerEndpointConfiguration.java | 22 ++++++++++++++++-- 5 files changed, 62 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3d0abd28/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java index 888be08..61bb085 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java @@ -81,4 +81,19 @@ public class RestOptions { key("rest.connection-timeout") .defaultValue(15_000L) .withDescription("The maximum time in ms for the client to establish a TCP connection."); + + /** + * The max content length that the server will handle. + */ + public static final ConfigOption<Integer> REST_SERVER_CONTENT_MAX_MB = + key("rest.server.content.max.mb") + .defaultValue(10); + + /** + * The max content length that the client will handle. + */ + public static final ConfigOption<Integer> REST_CLIENT_CONTENT_MAX_MB = + key("rest.client.content.max.mb") + .defaultValue(1); + } http://git-wip-us.apache.org/repos/asf/flink/blob/3d0abd28/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 3a0f6df..801119d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -103,7 +103,7 @@ public class RestClient { socketChannel.pipeline() .addLast(new HttpClientCodec()) - .addLast(new HttpObjectAggregator(1024 * 1024)) + .addLast(new HttpObjectAggregator(configuration.getMaxContentLength())) .addLast(new ClientHandler()) .addLast(new PipelineErrorHandler(LOG)); } http://git-wip-us.apache.org/repos/asf/flink/blob/3d0abd28/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java index 86578a2..782cb4e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java @@ -39,9 +39,15 @@ public final class RestClientConfiguration { private final long connectionTimeout; - private RestClientConfiguration(@Nullable SSLEngine sslEngine, final long connectionTimeout) { + private final int maxContentLength; + + private RestClientConfiguration( + @Nullable final SSLEngine sslEngine, + final long connectionTimeout, + final int maxContentLength) { this.sslEngine = sslEngine; this.connectionTimeout = connectionTimeout; + this.maxContentLength = maxContentLength; } /** @@ -62,6 +68,15 @@ public final class RestClientConfiguration { } /** + * Returns the max content length that the REST client endpoint could handle. + * + * @return max content length that the REST client endpoint could handle + */ + public int getMaxContentLength() { + return maxContentLength; + } + + /** * Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}. * * @param config configuration from which the REST client endpoint configuration should be created from @@ -89,6 +104,11 @@ public final class RestClientConfiguration { final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT); - return new RestClientConfiguration(sslEngine, connectionTimeout); + int maxContentLength = config.getInteger(RestOptions.REST_CLIENT_CONTENT_MAX_MB) * 1024 * 1024; + if (maxContentLength <= 0) { + throw new ConfigurationException("Max content length for client must be a positive integer: " + maxContentLength); + } + + return new RestClientConfiguration(sslEngine, connectionTimeout, maxContentLength); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3d0abd28/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java index f131ec1..42af4c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java @@ -78,6 +78,7 @@ public abstract class RestServerEndpoint { private final int configuredPort; private final SSLEngine sslEngine; private final Path uploadDir; + private final int maxContentLength; private final CompletableFuture<Void> terminationFuture; @@ -96,6 +97,8 @@ public abstract class RestServerEndpoint { this.uploadDir = configuration.getUploadDir(); createUploadDir(uploadDir, log); + this.maxContentLength = configuration.getMaxContentLength(); + terminationFuture = new CompletableFuture<>(); this.restAddress = null; @@ -156,7 +159,7 @@ public abstract class RestServerEndpoint { ch.pipeline() .addLast(new HttpServerCodec()) .addLast(new FileUploadHandler(uploadDir)) - .addLast(new HttpObjectAggregator(MAX_REQUEST_SIZE_BYTES)) + .addLast(new HttpObjectAggregator(maxContentLength)) .addLast(handler.name(), handler) .addLast(new PipelineErrorHandler(log)); } http://git-wip-us.apache.org/repos/asf/flink/blob/3d0abd28/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java index c411b51..3685e2d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java @@ -51,11 +51,14 @@ public final class RestServerEndpointConfiguration { private final Path uploadDir; + private final int maxContentLength; + private RestServerEndpointConfiguration( @Nullable String restBindAddress, int restBindPort, @Nullable SSLEngine sslEngine, - final Path uploadDir) { + final Path uploadDir, + final int maxContentLength) { Preconditions.checkArgument(0 <= restBindPort && restBindPort < 65536, "The bing rest port " + restBindPort + " is out of range (0, 65536["); @@ -63,6 +66,7 @@ public final class RestServerEndpointConfiguration { this.restBindPort = restBindPort; this.sslEngine = sslEngine; this.uploadDir = requireNonNull(uploadDir); + this.maxContentLength = maxContentLength; } /** @@ -100,6 +104,15 @@ public final class RestServerEndpointConfiguration { } /** + * Returns the max content length that the REST server endpoint could handle. + * + * @return max content length that the REST server endpoint could handle + */ + public int getMaxContentLength() { + return maxContentLength; + } + + /** * Creates and returns a new {@link RestServerEndpointConfiguration} from the given {@link Configuration}. * * @param config configuration from which the REST server endpoint configuration should be created from @@ -131,6 +144,11 @@ public final class RestServerEndpointConfiguration { config.getString(WebOptions.UPLOAD_DIR, config.getString(WebOptions.TMP_DIR)), "flink-web-upload-" + UUID.randomUUID()); - return new RestServerEndpointConfiguration(address, port, sslEngine, uploadDir); + int maxContentLength = config.getInteger(RestOptions.REST_SERVER_CONTENT_MAX_MB) * 1024 * 1024; + if (maxContentLength <= 0) { + throw new ConfigurationException("Max content length for server must be a positive integer: " + maxContentLength); + } + + return new RestServerEndpointConfiguration(address, port, sslEngine, uploadDir, maxContentLength); } }
