Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6149#discussion_r195142699
--- Diff:
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
---
@@ -166,12 +174,47 @@ public LimitedConnectionsFileSystem(
int maxNumOpenInputStreams,
long streamOpenTimeout,
long streamInactivityTimeout) {
+ this(originalFs, maxNumOpenStreamsTotal,
maxNumOpenOutputStreams, maxNumOpenInputStreams, streamOpenTimeout,
streamInactivityTimeout, 0, 0);
+ }
+
+ /**
+ * Creates a new output connection limiting file system, limiting input
and output streams with
+ * potentially different quotas.
+ *
+ * <p>If streams are inactive (meaning not writing bytes) for longer
than the given timeout,
+ * then they are terminated as "inactive", to prevent that the limited
number of connections gets
+ * stuck on only blocked threads.
+ *
+ * @param originalFs The original file system to which
connections are limited.
+ * @param maxNumOpenStreamsTotal The maximum number of concurrent open
streams (0 means no limit).
+ * @param maxNumOpenOutputStreams The maximum number of concurrent open
output streams (0 means no limit).
+ * @param maxNumOpenInputStreams The maximum number of concurrent open
input streams (0 means no limit).
+ * @param streamOpenTimeout The maximum number of milliseconds
that the file system will wait when
+ * no more connections are currently
permitted.
+ * @param streamInactivityTimeout The milliseconds that a stream may
spend not writing any
+ * bytes before it is closed as inactive.
+ * @param inputBytesPerSecondRate The rate limiting of Bytes red per
second on the FileSystem (0 means no limit)
+ * @param outputBytesPerSecondRate The rate limiting of Bytes written
per second on the FileSystem (0 means no limit)
+ */
+
+ public LimitedConnectionsFileSystem(
+ FileSystem originalFs,
+ int maxNumOpenStreamsTotal,
+ int maxNumOpenOutputStreams,
+ int maxNumOpenInputStreams,
+ long streamOpenTimeout,
+ long streamInactivityTimeout,
+ long inputBytesPerSecondRate,
+ long outputBytesPerSecondRate
+ ) {
--- End diff --
remove the newline after last argument
---