Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/5059#discussion_r152854163
--- Diff:
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
---
@@ -0,0 +1,1097 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A file system that limits the number of concurrently open input streams,
+ * output streams, and total streams for a target file system.
+ *
+ * <p>This file system can wrap another existing file system in cases where
+ * the target file system cannot handle certain connection spikes and
connections
+ * would fail in that case. This happens, for example, for very small HDFS
clusters
+ * with few RPC handlers, when a large Flink job tries to build up many
connections during
+ * a checkpoint.
+ *
+ * <p>The filesystem may track the progress of streams and close streams
that have been
+ * inactive for too long, to avoid locked streams of taking up the
complete pool.
+ * Rather than having a dedicated reaper thread, the calls that try to
open a new stream
+ * periodically check the currently open streams once the limit of open
streams is reached.
+ */
+@Internal
+public class LimitedConnectionsFileSystem extends FileSystem {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
+
+ /** The original file system to which connections are limited. */
+ private final FileSystem originalFs;
+
+ /** The lock that synchronizes connection bookkeeping. */
+ private final ReentrantLock lock;
+
+ /** Condition for threads that are blocking on the availability of new
connections. */
+ private final Condition available;
+
+ /** The maximum number of concurrently open output streams. */
+ private final int maxNumOpenOutputStreams;
+
+ /** The maximum number of concurrently open input streams. */
+ private final int maxNumOpenInputStreams;
+
+ /** The maximum number of concurrently open streams (input + output). */
+ private final int maxNumOpenStreamsTotal;
+
+ /** The nanoseconds that a opening a stream may wait for availability.
*/
+ private final long streamOpenTimeoutNanos;
+
+ /** The nanoseconds that a stream may spend not writing any bytes
before it is closed as inactive. */
+ private final long streamInactivityTimeoutNanos;
+
+ /** The set of currently open output streams. */
+ @GuardedBy("lock")
+ private final HashSet<OutStream> openOutputStreams;
+
+ /** The set of currently open input streams. */
+ @GuardedBy("lock")
+ private final HashSet<InStream> openInputStreams;
+
+ /** The number of output streams reserved to be opened. */
+ @GuardedBy("lock")
+ private int numReservedOutputStreams;
+
+ /** The number of input streams reserved to be opened. */
+ @GuardedBy("lock")
+ private int numReservedInputStreams;
+
+ //
------------------------------------------------------------------------
+
+ /**
+ * Creates a new output connection limiting file system.
+ *
+ * <p>If streams are inactive (meaning not writing bytes) for longer
than the given timeout,
+ * then they are terminated as "inactive", to prevent that the limited
number of connections gets
+ * stuck on only blocked threads.
+ *
+ * @param originalFs The original file system to which
connections are limited.
+ * @param maxNumOpenStreamsTotal The maximum number of concurrent open
streams (0 means no limit).
+ */
+ public LimitedConnectionsFileSystem(FileSystem originalFs, int
maxNumOpenStreamsTotal) {
+ this(originalFs, maxNumOpenStreamsTotal, 0, 0);
+ }
+
+ /**
+ * Creates a new output connection limiting file system.
+ *
+ * <p>If streams are inactive (meaning not writing bytes) for longer
than the given timeout,
+ * then they are terminated as "inactive", to prevent that the limited
number of connections gets
+ * stuck on only blocked threads.
+ *
+ * @param originalFs The original file system to which
connections are limited.
+ * @param maxNumOpenStreamsTotal The maximum number of concurrent open
streams (0 means no limit).
+ * @param streamOpenTimeout The maximum number of milliseconds
that the file system will wait when
+ * no more connections are currently
permitted.
+ * @param streamInactivityTimeout The milliseconds that a stream may
spend not writing any
+ * bytes before it is closed as inactive.
+ */
+ public LimitedConnectionsFileSystem(
+ FileSystem originalFs,
+ int maxNumOpenStreamsTotal,
+ long streamOpenTimeout,
+ long streamInactivityTimeout) {
+ this(originalFs, maxNumOpenStreamsTotal, 0, 0,
streamOpenTimeout, streamInactivityTimeout);
+ }
+
+ /**
+ * Creates a new output connection limiting file system, limiting input
and output streams with
+ * potentially different quotas.
+ *
+ * <p>If streams are inactive (meaning not writing bytes) for longer
than the given timeout,
+ * then they are terminated as "inactive", to prevent that the limited
number of connections gets
+ * stuck on only blocked threads.
+ *
+ * @param originalFs The original file system to which
connections are limited.
+ * @param maxNumOpenStreamsTotal The maximum number of concurrent open
streams (0 means no limit).
+ * @param maxNumOpenOutputStreams The maximum number of concurrent open
output streams (0 means no limit).
+ * @param maxNumOpenInputStreams The maximum number of concurrent open
input streams (0 means no limit).
+ * @param streamOpenTimeout The maximum number of milliseconds
that the file system will wait when
+ * no more connections are currently
permitted.
+ * @param streamInactivityTimeout The milliseconds that a stream may
spend not writing any
+ * bytes before it is closed as inactive.
+ */
+ public LimitedConnectionsFileSystem(
+ FileSystem originalFs,
+ int maxNumOpenStreamsTotal,
+ int maxNumOpenOutputStreams,
+ int maxNumOpenInputStreams,
+ long streamOpenTimeout,
+ long streamInactivityTimeout) {
+
+ checkArgument(maxNumOpenStreamsTotal >= 0,
"maxNumOpenStreamsTotal must be >= 0");
+ checkArgument(maxNumOpenOutputStreams >= 0,
"maxNumOpenOutputStreams must be >= 0");
+ checkArgument(maxNumOpenInputStreams >= 0,
"maxNumOpenInputStreams must be >= 0");
+ checkArgument(streamOpenTimeout >= 0, "stream opening timeout
must be >= 0 (0 means infinite timeout)");
+ checkArgument(streamInactivityTimeout >= 0, "stream inactivity
timeout must be >= 0 (0 means infinite timeout)");
+
+ this.originalFs = checkNotNull(originalFs, "originalFs");
+ this.lock = new ReentrantLock(true);
+ this.available = lock.newCondition();
+ this.openOutputStreams = new HashSet<>();
+ this.openInputStreams = new HashSet<>();
+ this.maxNumOpenStreamsTotal = maxNumOpenStreamsTotal;
+ this.maxNumOpenOutputStreams = maxNumOpenOutputStreams;
+ this.maxNumOpenInputStreams = maxNumOpenInputStreams;
+
+ // assign nanos overflow aware
+ final long openTimeoutNanos = streamOpenTimeout * 1_000_000;
+ final long inactivityTimeoutNanos = streamInactivityTimeout *
1_000_000;
+
+ this.streamOpenTimeoutNanos =
+ openTimeoutNanos >= streamOpenTimeout ?
openTimeoutNanos : Long.MAX_VALUE;
+
+ this.streamInactivityTimeoutNanos =
+ inactivityTimeoutNanos >=
streamInactivityTimeout ? inactivityTimeoutNanos : Long.MAX_VALUE;
+ }
+
+ //
------------------------------------------------------------------------
+
+ /**
+ * Gets the maximum number of concurrently open output streams.
+ */
+ public int getMaxNumOpenOutputStreams() {
+ return maxNumOpenOutputStreams;
+ }
+
+ /**
+ * Gets the maximum number of concurrently open input streams.
+ */
+ public int getMaxNumOpenInputStreams() {
+ return maxNumOpenInputStreams;
+ }
+
+ /**
+ * Gets the maximum number of concurrently open streams (input +
output).
+ */
+ public int getMaxNumOpenStreamsTotal() {
+ return maxNumOpenStreamsTotal;
+ }
+
+ /**
+ * Gets the number of milliseconds that a opening a stream may wait for
availability in the
+ * connection pool.
+ */
+ public long getStreamOpenTimeout() {
+ return streamOpenTimeoutNanos / 1_000_000;
+ }
+
+ /**
+ * Gets the milliseconds that a stream may spend not writing any bytes
before it is closed as inactive.
+ */
+ public long getStreamInactivityTimeout() {
+ return streamInactivityTimeoutNanos / 1_000_000;
+ }
+
+ /**
+ * Gets the total number of open streams (input plus output).
+ */
+ public int getTotalNumberOfOpenStreams() {
+ lock.lock();
+ try {
+ return numReservedOutputStreams +
numReservedInputStreams;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Gets the number of currently open output streams.
+ */
+ public int getNumberOfOpenOutputStreams() {
+ lock.lock();
+ try {
+ return numReservedOutputStreams;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Gets the number of currently open input streams.
+ */
+ public int getNumberOfOpenInputStreams() {
+ return numReservedInputStreams;
+ }
+
+ //
------------------------------------------------------------------------
+ // input & output stream opening methods
+ //
------------------------------------------------------------------------
+
+ @Override
+ public FSDataOutputStream create(Path f, WriteMode overwriteMode)
throws IOException {
+ return createOutputStream(() -> originalFs.create(f,
overwriteMode));
+ }
+
+ @Override
+ @Deprecated
+ @SuppressWarnings("deprecation")
+ public FSDataOutputStream create(
+ Path f,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize) throws IOException {
+
+ return createOutputStream(() -> originalFs.create(f, overwrite,
bufferSize, replication, blockSize));
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws
IOException {
+ return createInputStream(() -> originalFs.open(f, bufferSize));
+ }
+
+ @Override
+ public FSDataInputStream open(Path f) throws IOException {
+ return createInputStream(() -> originalFs.open(f));
+ }
+
+ private FSDataOutputStream createOutputStream(
+ final SupplierWithException<FSDataOutputStream,
IOException> streamOpener) throws IOException {
+
+ final SupplierWithException<OutStream, IOException>
wrappedStreamOpener =
+ () -> new OutStream(streamOpener.get(), this);
+
+ return createStream(wrappedStreamOpener, openOutputStreams,
true);
+ }
+
+ private FSDataInputStream createInputStream(
+ final SupplierWithException<FSDataInputStream,
IOException> streamOpener) throws IOException {
+
+ final SupplierWithException<InStream, IOException>
wrappedStreamOpener =
+ () -> new InStream(streamOpener.get(), this);
+
+ return createStream(wrappedStreamOpener, openInputStreams,
false);
+ }
+
+ //
------------------------------------------------------------------------
+ // other delegating file system methods
+ //
------------------------------------------------------------------------
+
+ @Override
+ public FileSystemKind getKind() {
+ return originalFs.getKind();
+ }
+
+ @Override
+ public boolean isDistributedFS() {
+ return originalFs.isDistributedFS();
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return originalFs.getWorkingDirectory();
+ }
+
+ @Override
+ public Path getHomeDirectory() {
+ return originalFs.getHomeDirectory();
+ }
+
+ @Override
+ public URI getUri() {
+ return originalFs.getUri();
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ return originalFs.getFileStatus(f);
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(FileStatus file, long
start, long len) throws IOException {
+ return originalFs.getFileBlockLocations(file, start, len);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws IOException {
+ return originalFs.listStatus(f);
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ return originalFs.delete(f, recursive);
+ }
+
+ @Override
+ public boolean mkdirs(Path f) throws IOException {
+ return originalFs.mkdirs(f);
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ return originalFs.rename(src, dst);
+ }
+
+ @Override
+ public boolean exists(Path f) throws IOException {
+ return originalFs.exists(f);
+ }
+
+ @Override
+ @Deprecated
+ @SuppressWarnings("deprecation")
+ public long getDefaultBlockSize() {
+ return originalFs.getDefaultBlockSize();
+ }
+
+ //
------------------------------------------------------------------------
+
+ private <T extends StreamWithTimeout> T createStream(
+ final SupplierWithException<T, IOException>
streamOpener,
+ final HashSet<T> openStreams,
+ final boolean output) throws IOException {
+
+ final int outputLimit = output && maxNumOpenInputStreams > 0 ?
maxNumOpenOutputStreams : Integer.MAX_VALUE;
+ final int inputLimit = !output && maxNumOpenInputStreams > 0 ?
maxNumOpenInputStreams : Integer.MAX_VALUE;
+ final int totalLimit = maxNumOpenStreamsTotal > 0 ?
maxNumOpenStreamsTotal : Integer.MAX_VALUE;
+ final int outputCredit = output ? 1 : 0;
+ final int inputCredit = output ? 0 : 1;
+
+ // because waiting for availability may take long, we need to
be interruptible here
+ // and handle interrupted exceptions as I/O errors
+ // even though the code is written to make sure the lock is
held for a short time only,
+ // making the lock acquisition interruptible helps to guard
against the cases where
+ // a supposedly fast operation (like 'getPos()' on a stream)
actually takes long.
+ try {
+ lock.lockInterruptibly();
+ try {
+ // some integrity checks
+ assert openOutputStreams.size() <=
numReservedOutputStreams;
+ assert openInputStreams.size() <=
numReservedInputStreams;
+
+ // wait until there are few enough streams so
we can open another
+ waitForAvailability(totalLimit, outputLimit,
inputLimit);
+
+ // We do not open the stream here in the locked
scope because opening a stream
+ // could take a while. Holding the lock during
that operation would block all concurrent
+ // attempts to try and open a stream,
effectively serializing all calls to open the streams.
+ numReservedOutputStreams += outputCredit;
+ numReservedInputStreams += inputCredit;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ catch (InterruptedException e) {
+ // restore interruption flag
+ Thread.currentThread().interrupt();
+ throw new IOException("interrupted before opening
stream");
+ }
+
+ // open the stream outside the lock.
+ boolean success = false;
+ try {
+ final T out = streamOpener.get();
+
+ // add the stream to the set, need to re-acquire the
lock
+ lock.lock();
+ try {
+ openStreams.add(out);
+ } finally {
+ lock.unlock();
+ }
+
+ // good, can now return cleanly
+ success = true;
+ return out;
+ }
+ finally {
+ if (!success) {
+ // remove the reserved credit
+ // we must open this non-interruptibly, because
this must succeed!
+ lock.lock();
+ try {
+ numReservedOutputStreams -=
outputCredit;
+ numReservedInputStreams -= inputCredit;
+ available.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ }
+
+ @GuardedBy("lock")
+ private void waitForAvailability(
+ int totalLimit,
+ int outputLimit,
+ int inputLimit) throws InterruptedException,
IOException {
+
+ checkState(lock.isHeldByCurrentThread());
+
+ // compute the deadline of this operations
+ final long deadline;
+ if (streamOpenTimeoutNanos == 0) {
+ deadline = Long.MAX_VALUE;
+ } else {
+ long deadlineNanos = System.nanoTime() +
streamOpenTimeoutNanos;
+ // check for overflow
+ deadline = deadlineNanos > 0 ? deadlineNanos :
Long.MAX_VALUE;
+ }
+
+ // wait for available connections
+ long timeLeft;
+
+ if (streamInactivityTimeoutNanos == 0) {
+ // simple case: just wait
+ while ((timeLeft = (deadline - System.nanoTime())) > 0
&&
+ !hasAvailability(totalLimit,
outputLimit, inputLimit)) {
+
+ available.await(timeLeft, TimeUnit.NANOSECONDS);
+ }
+ }
+ else {
+ // complex case: chase down inactive streams
+ final long checkIntervalNanos =
(streamInactivityTimeoutNanos >>> 1) + 1;
+
+ long now;
+ while ((timeLeft = (deadline - (now =
System.nanoTime()))) > 0 && // while still within timeout
+ !hasAvailability(totalLimit,
outputLimit, inputLimit)) {
+
+ // check all streams whether there in one that
has been inactive for too long
+ if (!(closeInactiveStream(openOutputStreams,
now) || closeInactiveStream(openInputStreams, now))) {
+ // only wait if we did not manage to
close any stream.
+ // otherwise eagerly check again if we
have availability now (we should have!)
+ long timeToWait =
Math.min(checkIntervalNanos, timeLeft);
+ available.await(timeToWait,
TimeUnit.NANOSECONDS);
+ }
+ }
+ }
+
+ // check for timeout
+ // we check availability again to catch cases where the timeout
expired while waiting
+ // to re-acquire the lock
+ if (timeLeft <= 0 && !hasAvailability(totalLimit, outputLimit,
inputLimit)) {
+ throw new IOException(String.format(
+ "Timeout while waiting for an available
stream/connect. " +
+ "limits: total=%d, input=%d, output=%d
; Open: input=%d, output=%d ; timeout: %d ms",
+ maxNumOpenStreamsTotal,
maxNumOpenInputStreams, maxNumOpenOutputStreams,
+ numReservedInputStreams,
numReservedOutputStreams, getStreamOpenTimeout()));
+ }
+ }
+
+ @GuardedBy("lock")
+ private boolean hasAvailability(int totalLimit, int outputLimit, int
inputLimit) {
+ return numReservedOutputStreams < outputLimit &&
+ numReservedInputStreams < inputLimit &&
+ numReservedOutputStreams +
numReservedInputStreams < totalLimit;
+ }
+
+ @GuardedBy("lock")
+ private boolean closeInactiveStream(HashSet<? extends
StreamWithTimeout> streams, long nowNanos) {
+ for (StreamWithTimeout stream : streams) {
+ try {
+ // If the stream is closed already, it will be
removed anyways, so we
+ // do not classify it as inactive. We also skip
the check if another check happened too recently.
+ if (stream.isClosed() || nowNanos <
stream.getLastCheckTimestampNanos() + streamInactivityTimeoutNanos) {
+ // interval since last check not yet
over
+ return false;
+ }
+ else if
(!stream.checkNewBytesAndMark(nowNanos)) {
+ stream.closeDueToTimeout();
+ return true;
+ }
+ }
+ catch (StreamTimeoutException ignored) {
+ // may happen due to races
+ }
+ catch (IOException e) {
+ // only log on debug level here, to avoid log
spamming
+ LOG.debug("Could not check for stream progress
to determine inactivity", e);
--- End diff --
Good point, I thought about that as well, but was not sure. I would feel a
bit hesitant to do that on the first time such an exception occurs, and it
would need more elaborate logic otherwise.
---