Repository: flink
Updated Branches:
  refs/heads/master 445cdfd57 -> da3fc4fde


[FLINK-7521] Add config option to set the content length limit of REST server 
and client


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f46d6db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f46d6db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f46d6db

Branch: refs/heads/master
Commit: 6f46d6db182fa768417ac1397c0abe3f30da1942
Parents: 445cdfd
Author: zjureel <[email protected]>
Authored: Thu Sep 7 10:39:39 2017 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Wed Mar 14 09:37:50 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/configuration/RestOptions.java | 15 ++++++++++++
 .../apache/flink/runtime/rest/RestClient.java   |  2 +-
 .../runtime/rest/RestClientConfiguration.java   | 24 ++++++++++++++++++--
 .../flink/runtime/rest/RestServerEndpoint.java  |  5 +++-
 .../rest/RestServerEndpointConfiguration.java   | 22 ++++++++++++++++--
 5 files changed, 62 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f46d6db/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 888be08..61bb085 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -81,4 +81,19 @@ public class RestOptions {
                key("rest.connection-timeout")
                        .defaultValue(15_000L)
                        .withDescription("The maximum time in ms for the client 
to establish a TCP connection.");
+
+       /**
+        * The max content length that the server will handle.
+        */
+       public static final ConfigOption<Integer> REST_SERVER_CONTENT_MAX_MB =
+               key("rest.server.content.max.mb")
+                       .defaultValue(10);
+
+       /**
+        * The max content length that the client will handle.
+        */
+       public static final ConfigOption<Integer> REST_CLIENT_CONTENT_MAX_MB =
+               key("rest.client.content.max.mb")
+                       .defaultValue(1);
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f46d6db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 3a0f6df..801119d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -103,7 +103,7 @@ public class RestClient {
 
                                socketChannel.pipeline()
                                        .addLast(new HttpClientCodec())
-                                       .addLast(new HttpObjectAggregator(1024 
* 1024))
+                                       .addLast(new 
HttpObjectAggregator(configuration.getMaxContentLength()))
                                        .addLast(new ClientHandler())
                                        .addLast(new PipelineErrorHandler(LOG));
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f46d6db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
index 86578a2..782cb4e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -39,9 +39,15 @@ public final class RestClientConfiguration {
 
        private final long connectionTimeout;
 
-       private RestClientConfiguration(@Nullable SSLEngine sslEngine, final 
long connectionTimeout) {
+       private final int maxContentLength;
+
+       private RestClientConfiguration(
+                       @Nullable final SSLEngine sslEngine,
+                       final long connectionTimeout,
+                       final int maxContentLength) {
                this.sslEngine = sslEngine;
                this.connectionTimeout = connectionTimeout;
+               this.maxContentLength = maxContentLength;
        }
 
        /**
@@ -62,6 +68,15 @@ public final class RestClientConfiguration {
        }
 
        /**
+        * Returns the max content length that the REST client endpoint could 
handle.
+        *
+        * @return max content length that the REST client endpoint could handle
+        */
+       public int getMaxContentLength() {
+               return maxContentLength;
+       }
+
+       /**
         * Creates and returns a new {@link RestClientConfiguration} from the 
given {@link Configuration}.
         *
         * @param config configuration from which the REST client endpoint 
configuration should be created from
@@ -89,6 +104,11 @@ public final class RestClientConfiguration {
 
                final long connectionTimeout = 
config.getLong(RestOptions.CONNECTION_TIMEOUT);
 
-               return new RestClientConfiguration(sslEngine, 
connectionTimeout);
+               int maxContentLength = 
config.getInteger(RestOptions.REST_CLIENT_CONTENT_MAX_MB) * 1024 * 1024;
+               if (maxContentLength <= 0) {
+                       throw new ConfigurationException("Max content length 
for client must be a positive integer: " + maxContentLength);
+               }
+
+               return new RestClientConfiguration(sslEngine, 
connectionTimeout, maxContentLength);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f46d6db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index f131ec1..42af4c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -78,6 +78,7 @@ public abstract class RestServerEndpoint {
        private final int configuredPort;
        private final SSLEngine sslEngine;
        private final Path uploadDir;
+       private final int maxContentLength;
 
        private final CompletableFuture<Void> terminationFuture;
 
@@ -96,6 +97,8 @@ public abstract class RestServerEndpoint {
                this.uploadDir = configuration.getUploadDir();
                createUploadDir(uploadDir, log);
 
+               this.maxContentLength = configuration.getMaxContentLength();
+
                terminationFuture = new CompletableFuture<>();
 
                this.restAddress = null;
@@ -156,7 +159,7 @@ public abstract class RestServerEndpoint {
                                        ch.pipeline()
                                                .addLast(new HttpServerCodec())
                                                .addLast(new 
FileUploadHandler(uploadDir))
-                                               .addLast(new 
HttpObjectAggregator(MAX_REQUEST_SIZE_BYTES))
+                                               .addLast(new 
HttpObjectAggregator(maxContentLength))
                                                .addLast(handler.name(), 
handler)
                                                .addLast(new 
PipelineErrorHandler(log));
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f46d6db/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index c411b51..3685e2d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -51,11 +51,14 @@ public final class RestServerEndpointConfiguration {
 
        private final Path uploadDir;
 
+       private final int maxContentLength;
+
        private RestServerEndpointConfiguration(
                        @Nullable String restBindAddress,
                        int restBindPort,
                        @Nullable SSLEngine sslEngine,
-                       final Path uploadDir) {
+                       final Path uploadDir,
+                       final int maxContentLength) {
 
                Preconditions.checkArgument(0 <= restBindPort && restBindPort < 
65536, "The bing rest port " + restBindPort + " is out of range (0, 65536[");
 
@@ -63,6 +66,7 @@ public final class RestServerEndpointConfiguration {
                this.restBindPort = restBindPort;
                this.sslEngine = sslEngine;
                this.uploadDir = requireNonNull(uploadDir);
+               this.maxContentLength = maxContentLength;
        }
 
        /**
@@ -100,6 +104,15 @@ public final class RestServerEndpointConfiguration {
        }
 
        /**
+        * Returns the max content length that the REST server endpoint could 
handle.
+        *
+        * @return max content length that the REST server endpoint could handle
+        */
+       public int getMaxContentLength() {
+               return maxContentLength;
+       }
+
+       /**
         * Creates and returns a new {@link RestServerEndpointConfiguration} 
from the given {@link Configuration}.
         *
         * @param config configuration from which the REST server endpoint 
configuration should be created from
@@ -131,6 +144,11 @@ public final class RestServerEndpointConfiguration {
                        config.getString(WebOptions.UPLOAD_DIR, 
config.getString(WebOptions.TMP_DIR)),
                        "flink-web-upload-" + UUID.randomUUID());
 
-               return new RestServerEndpointConfiguration(address, port, 
sslEngine, uploadDir);
+               int maxContentLength = 
config.getInteger(RestOptions.REST_SERVER_CONTENT_MAX_MB) * 1024 * 1024;
+               if (maxContentLength <= 0) {
+                       throw new ConfigurationException("Max content length 
for server must be a positive integer: " + maxContentLength);
+               }
+
+               return new RestServerEndpointConfiguration(address, port, 
sslEngine, uploadDir, maxContentLength);
        }
 }

Reply via email to