Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5059#discussion_r152853859
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/core/fs/LimitedConnectionsFileSystem.java
 ---
    @@ -0,0 +1,1097 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.core.fs;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.configuration.ConfigOption;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.CoreOptions;
    +import org.apache.flink.configuration.IllegalConfigurationException;
    +import org.apache.flink.util.function.SupplierWithException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +import javax.annotation.concurrent.GuardedBy;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.HashSet;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.locks.Condition;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * A file system that limits the number of concurrently open input streams,
    + * output streams, and total streams for a target file system.
    + *
    + * <p>This file system can wrap another existing file system in cases where
    + * the target file system cannot handle certain connection spikes and 
connections
    + * would fail in that case. This happens, for example, for very small HDFS 
clusters
    + * with few RPC handlers, when a large Flink job tries to build up many 
connections during
    + * a checkpoint.
    + *
    + * <p>The filesystem may track the progress of streams and close streams 
that have been
    + * inactive for too long, to avoid locked streams of taking up the 
complete pool.
    + * Rather than having a dedicated reaper thread, the calls that try to 
open a new stream
    + * periodically check the currently open streams once the limit of open 
streams is reached.
    + */
    +@Internal
    +public class LimitedConnectionsFileSystem extends FileSystem {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
    +
    +   /** The original file system to which connections are limited. */
    +   private final FileSystem originalFs;
    +
    +   /** The lock that synchronizes connection bookkeeping. */
    +   private final ReentrantLock lock;
    +
    +   /** Condition for threads that are blocking on the availability of new 
connections. */
    +   private final Condition available;
    +
    +   /** The maximum number of concurrently open output streams. */
    +   private final int maxNumOpenOutputStreams;
    +
    +   /** The maximum number of concurrently open input streams. */
    +   private final int maxNumOpenInputStreams;
    +
    +   /** The maximum number of concurrently open streams (input + output). */
    +   private final int maxNumOpenStreamsTotal;
    +
    +   /** The nanoseconds that a opening a stream may wait for availability. 
*/
    +   private final long streamOpenTimeoutNanos;
    +
    +   /** The nanoseconds that a stream may spend not writing any bytes 
before it is closed as inactive. */
    +   private final long streamInactivityTimeoutNanos;
    +
    +   /** The set of currently open output streams. */
    +   @GuardedBy("lock")
    +   private final HashSet<OutStream> openOutputStreams;
    +
    +   /** The set of currently open input streams. */
    +   @GuardedBy("lock")
    +   private final HashSet<InStream> openInputStreams;
    +
    +   /** The number of output streams reserved to be opened. */
    +   @GuardedBy("lock")
    +   private int numReservedOutputStreams;
    +
    +   /** The number of input streams reserved to be opened. */
    +   @GuardedBy("lock")
    +   private int numReservedInputStreams;
    +
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Creates a new output connection limiting file system.
    +    *
    +    * <p>If streams are inactive (meaning not writing bytes) for longer 
than the given timeout,
    +    * then they are terminated as "inactive", to prevent that the limited 
number of connections gets
    +    * stuck on only blocked threads.
    +    *
    +    * @param originalFs              The original file system to which 
connections are limited.
    +    * @param maxNumOpenStreamsTotal  The maximum number of concurrent open 
streams (0 means no limit).
    +    */
    +   public LimitedConnectionsFileSystem(FileSystem originalFs, int 
maxNumOpenStreamsTotal) {
    +           this(originalFs, maxNumOpenStreamsTotal, 0, 0);
    +   }
    +
    +   /**
    +    * Creates a new output connection limiting file system.
    +    *
    +    * <p>If streams are inactive (meaning not writing bytes) for longer 
than the given timeout,
    +    * then they are terminated as "inactive", to prevent that the limited 
number of connections gets
    +    * stuck on only blocked threads.
    +    *
    +    * @param originalFs              The original file system to which 
connections are limited.
    +    * @param maxNumOpenStreamsTotal  The maximum number of concurrent open 
streams (0 means no limit).
    +    * @param streamOpenTimeout       The maximum number of milliseconds 
that the file system will wait when
    +    *                                no more connections are currently 
permitted.
    +    * @param streamInactivityTimeout The milliseconds that a stream may 
spend not writing any
    +    *                                bytes before it is closed as inactive.
    +    */
    +   public LimitedConnectionsFileSystem(
    +                   FileSystem originalFs,
    +                   int maxNumOpenStreamsTotal,
    +                   long streamOpenTimeout,
    +                   long streamInactivityTimeout) {
    +           this(originalFs, maxNumOpenStreamsTotal, 0, 0, 
streamOpenTimeout, streamInactivityTimeout);
    +   }
    +
    +   /**
    +    * Creates a new output connection limiting file system, limiting input 
and output streams with
    +    * potentially different quotas.
    +    *
    +    * <p>If streams are inactive (meaning not writing bytes) for longer 
than the given timeout,
    +    * then they are terminated as "inactive", to prevent that the limited 
number of connections gets
    +    * stuck on only blocked threads.
    +    *
    +    * @param originalFs              The original file system to which 
connections are limited.
    +    * @param maxNumOpenStreamsTotal  The maximum number of concurrent open 
streams (0 means no limit).
    +    * @param maxNumOpenOutputStreams The maximum number of concurrent open 
output streams (0 means no limit).
    +    * @param maxNumOpenInputStreams  The maximum number of concurrent open 
input streams (0 means no limit).
    +    * @param streamOpenTimeout       The maximum number of milliseconds 
that the file system will wait when
    +    *                                no more connections are currently 
permitted.
    +    * @param streamInactivityTimeout The milliseconds that a stream may 
spend not writing any
    +    *                                bytes before it is closed as inactive.
    +    */
    +   public LimitedConnectionsFileSystem(
    +                   FileSystem originalFs,
    +                   int maxNumOpenStreamsTotal,
    +                   int maxNumOpenOutputStreams,
    +                   int maxNumOpenInputStreams,
    +                   long streamOpenTimeout,
    +                   long streamInactivityTimeout) {
    +
    +           checkArgument(maxNumOpenStreamsTotal >= 0, 
"maxNumOpenStreamsTotal must be >= 0");
    +           checkArgument(maxNumOpenOutputStreams >= 0, 
"maxNumOpenOutputStreams must be >= 0");
    +           checkArgument(maxNumOpenInputStreams >= 0, 
"maxNumOpenInputStreams must be >= 0");
    +           checkArgument(streamOpenTimeout >= 0, "stream opening timeout 
must be >= 0 (0 means infinite timeout)");
    +           checkArgument(streamInactivityTimeout >= 0, "stream inactivity 
timeout must be >= 0 (0 means infinite timeout)");
    +
    +           this.originalFs = checkNotNull(originalFs, "originalFs");
    +           this.lock = new ReentrantLock(true);
    +           this.available = lock.newCondition();
    +           this.openOutputStreams = new HashSet<>();
    +           this.openInputStreams = new HashSet<>();
    +           this.maxNumOpenStreamsTotal = maxNumOpenStreamsTotal;
    +           this.maxNumOpenOutputStreams = maxNumOpenOutputStreams;
    +           this.maxNumOpenInputStreams = maxNumOpenInputStreams;
    +
    +           // assign nanos overflow aware
    +           final long openTimeoutNanos = streamOpenTimeout * 1_000_000;
    +           final long inactivityTimeoutNanos = streamInactivityTimeout * 
1_000_000;
    +
    +           this.streamOpenTimeoutNanos =
    +                           openTimeoutNanos >= streamOpenTimeout ? 
openTimeoutNanos : Long.MAX_VALUE;
    +
    +           this.streamInactivityTimeoutNanos =
    +                           inactivityTimeoutNanos >= 
streamInactivityTimeout ? inactivityTimeoutNanos : Long.MAX_VALUE;
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Gets the maximum number of concurrently open output streams.
    +    */
    +   public int getMaxNumOpenOutputStreams() {
    +           return maxNumOpenOutputStreams;
    +   }
    +
    +   /**
    +    * Gets the maximum number of concurrently open input streams.
    +    */
    +   public int getMaxNumOpenInputStreams() {
    +           return maxNumOpenInputStreams;
    +   }
    +
    +   /**
    +    * Gets the maximum number of concurrently open streams (input + 
output).
    +    */
    +   public int getMaxNumOpenStreamsTotal() {
    +           return maxNumOpenStreamsTotal;
    +   }
    +
    +   /**
    +    * Gets the number of milliseconds that a opening a stream may wait for 
availability in the
    +    * connection pool.
    +    */
    +   public long getStreamOpenTimeout() {
    +           return streamOpenTimeoutNanos / 1_000_000;
    +   }
    +
    +   /**
    +    * Gets the milliseconds that a stream may spend not writing any bytes 
before it is closed as inactive.
    +    */
    +   public long getStreamInactivityTimeout() {
    +           return streamInactivityTimeoutNanos / 1_000_000;
    +   }
    +
    +   /**
    +    * Gets the total number of open streams (input plus output).
    +    */
    +   public int getTotalNumberOfOpenStreams() {
    +           lock.lock();
    +           try {
    +                   return numReservedOutputStreams + 
numReservedInputStreams;
    +           } finally {
    +                   lock.unlock();
    +           }
    +   }
    +
    +   /**
    +    * Gets the number of currently open output streams.
    +    */
    +   public int getNumberOfOpenOutputStreams() {
    +           lock.lock();
    +           try {
    +                   return numReservedOutputStreams;
    +           }
    +           finally {
    +                   lock.unlock();
    +           }
    +   }
    +
    +   /**
    +    * Gets the number of currently open input streams.
    +    */
    +   public int getNumberOfOpenInputStreams() {
    +           return numReservedInputStreams;
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  input & output stream opening methods
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   public FSDataOutputStream create(Path f, WriteMode overwriteMode) 
throws IOException {
    +           return createOutputStream(() -> originalFs.create(f, 
overwriteMode));
    +   }
    +
    +   @Override
    +   @Deprecated
    +   @SuppressWarnings("deprecation")
    +   public FSDataOutputStream create(
    +                   Path f,
    +                   boolean overwrite,
    +                   int bufferSize,
    +                   short replication,
    +                   long blockSize) throws IOException {
    +
    +           return createOutputStream(() -> originalFs.create(f, overwrite, 
bufferSize, replication, blockSize));
    +   }
    +
    +   @Override
    +   public FSDataInputStream open(Path f, int bufferSize) throws 
IOException {
    +           return createInputStream(() -> originalFs.open(f, bufferSize));
    +   }
    +
    +   @Override
    +   public FSDataInputStream open(Path f) throws IOException {
    +           return createInputStream(() -> originalFs.open(f));
    +   }
    +
    +   private FSDataOutputStream createOutputStream(
    +                   final SupplierWithException<FSDataOutputStream, 
IOException> streamOpener) throws IOException {
    +
    +           final SupplierWithException<OutStream, IOException> 
wrappedStreamOpener =
    +                           () -> new OutStream(streamOpener.get(), this);
    +
    +           return createStream(wrappedStreamOpener, openOutputStreams, 
true);
    +   }
    +
    +   private FSDataInputStream createInputStream(
    +                   final SupplierWithException<FSDataInputStream, 
IOException> streamOpener) throws IOException {
    +
    +           final SupplierWithException<InStream, IOException> 
wrappedStreamOpener =
    +                           () -> new InStream(streamOpener.get(), this);
    +
    +           return createStream(wrappedStreamOpener, openInputStreams, 
false);
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  other delegating file system methods
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   public FileSystemKind getKind() {
    +           return originalFs.getKind();
    +   }
    +
    +   @Override
    +   public boolean isDistributedFS() {
    +           return originalFs.isDistributedFS();
    +   }
    +
    +   @Override
    +   public Path getWorkingDirectory() {
    +           return originalFs.getWorkingDirectory();
    +   }
    +
    +   @Override
    +   public Path getHomeDirectory() {
    +           return originalFs.getHomeDirectory();
    +   }
    +
    +   @Override
    +   public URI getUri() {
    +           return originalFs.getUri();
    +   }
    +
    +   @Override
    +   public FileStatus getFileStatus(Path f) throws IOException {
    +           return originalFs.getFileStatus(f);
    +   }
    +
    +   @Override
    +   public BlockLocation[] getFileBlockLocations(FileStatus file, long 
start, long len) throws IOException {
    +           return originalFs.getFileBlockLocations(file, start, len);
    +   }
    +
    +   @Override
    +   public FileStatus[] listStatus(Path f) throws IOException {
    +           return originalFs.listStatus(f);
    +   }
    +
    +   @Override
    +   public boolean delete(Path f, boolean recursive) throws IOException {
    +           return originalFs.delete(f, recursive);
    +   }
    +
    +   @Override
    +   public boolean mkdirs(Path f) throws IOException {
    +           return originalFs.mkdirs(f);
    +   }
    +
    +   @Override
    +   public boolean rename(Path src, Path dst) throws IOException {
    +           return originalFs.rename(src, dst);
    +   }
    +
    +   @Override
    +   public boolean exists(Path f) throws IOException {
    +           return originalFs.exists(f);
    +   }
    +
    +   @Override
    +   @Deprecated
    +   @SuppressWarnings("deprecation")
    +   public long getDefaultBlockSize() {
    +           return originalFs.getDefaultBlockSize();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +
    +   private <T extends StreamWithTimeout> T createStream(
    +                   final SupplierWithException<T, IOException> 
streamOpener,
    +                   final HashSet<T> openStreams,
    +                   final boolean output) throws IOException {
    +
    +           final int outputLimit = output && maxNumOpenInputStreams > 0 ? 
maxNumOpenOutputStreams : Integer.MAX_VALUE;
    +           final int inputLimit = !output && maxNumOpenInputStreams > 0 ? 
maxNumOpenInputStreams : Integer.MAX_VALUE;
    +           final int totalLimit = maxNumOpenStreamsTotal > 0 ? 
maxNumOpenStreamsTotal : Integer.MAX_VALUE;
    +           final int outputCredit = output ? 1 : 0;
    +           final int inputCredit = output ? 0 : 1;
    +
    +           // because waiting for availability may take long, we need to 
be interruptible here
    +           // and handle interrupted exceptions as I/O errors
    +           // even though the code is written to make sure the lock is 
held for a short time only,
    +           // making the lock acquisition interruptible helps to guard 
against the cases where
    +           // a supposedly fast operation (like 'getPos()' on a stream) 
actually takes long.
    +           try {
    +                   lock.lockInterruptibly();
    +                   try {
    +                           // some integrity checks
    +                           assert openOutputStreams.size() <= 
numReservedOutputStreams;
    +                           assert openInputStreams.size() <= 
numReservedInputStreams;
    +
    +                           // wait until there are few enough streams so 
we can open another
    +                           waitForAvailability(totalLimit, outputLimit, 
inputLimit);
    +
    +                           // We do not open the stream here in the locked 
scope because opening a stream
    +                           // could take a while. Holding the lock during 
that operation would block all concurrent
    +                           // attempts to try and open a stream, 
effectively serializing all calls to open the streams.
    +                           numReservedOutputStreams += outputCredit;
    +                           numReservedInputStreams += inputCredit;
    +                   }
    +                   finally {
    +                           lock.unlock();
    +                   }
    +           }
    +           catch (InterruptedException e) {
    +                   // restore interruption flag
    +                   Thread.currentThread().interrupt();
    +                   throw new IOException("interrupted before opening 
stream");
    +           }
    +
    +           // open the stream outside the lock.
    +           boolean success = false;
    +           try {
    +                   final T out = streamOpener.get();
    +
    +                   // add the stream to the set, need to re-acquire the 
lock
    +                   lock.lock();
    +                   try {
    +                           openStreams.add(out);
    +                   } finally {
    +                           lock.unlock();
    +                   }
    +
    +                   // good, can now return cleanly
    +                   success = true;
    +                   return out;
    +           }
    +           finally {
    +                   if (!success) {
    +                           // remove the reserved credit
    +                           // we must open this non-interruptibly, because 
this must succeed!
    +                           lock.lock();
    +                           try {
    +                                   numReservedOutputStreams -= 
outputCredit;
    +                                   numReservedInputStreams -= inputCredit;
    +                                   available.signalAll();
    +                           } finally {
    +                                   lock.unlock();
    +                           }
    +                   }
    +           }
    +   }
    +
    +   @GuardedBy("lock")
    +   private void waitForAvailability(
    +                   int totalLimit,
    +                   int outputLimit,
    +                   int inputLimit) throws InterruptedException, 
IOException {
    +
    +           checkState(lock.isHeldByCurrentThread());
    +
    +           // compute the deadline of this operations
    +           final long deadline;
    +           if (streamOpenTimeoutNanos == 0) {
    +                   deadline = Long.MAX_VALUE;
    +           } else {
    +                   long deadlineNanos = System.nanoTime() + 
streamOpenTimeoutNanos;
    +                   // check for overflow
    +                   deadline = deadlineNanos > 0 ? deadlineNanos : 
Long.MAX_VALUE;
    +           }
    +
    +           // wait for available connections
    +           long timeLeft;
    +
    +           if (streamInactivityTimeoutNanos == 0) {
    +                   // simple case: just wait
    +                   while ((timeLeft = (deadline - System.nanoTime())) > 0 
&&
    +                                   !hasAvailability(totalLimit, 
outputLimit, inputLimit)) {
    +
    +                           available.await(timeLeft, TimeUnit.NANOSECONDS);
    +                   }
    +           }
    +           else {
    +                   // complex case: chase down inactive streams
    +                   final long checkIntervalNanos = 
(streamInactivityTimeoutNanos >>> 1) + 1;
    +
    +                   long now;
    +                   while ((timeLeft = (deadline - (now = 
System.nanoTime()))) > 0 && // while still within timeout
    +                                   !hasAvailability(totalLimit, 
outputLimit, inputLimit)) {
    +
    +                           // check all streams whether there in one that 
has been inactive for too long
    +                           if (!(closeInactiveStream(openOutputStreams, 
now) || closeInactiveStream(openInputStreams, now))) {
    +                                   // only wait if we did not manage to 
close any stream.
    +                                   // otherwise eagerly check again if we 
have availability now (we should have!)
    +                                   long timeToWait = 
Math.min(checkIntervalNanos, timeLeft);
    +                                   available.await(timeToWait, 
TimeUnit.NANOSECONDS);
    +                           }
    +                   }
    +           }
    +
    +           // check for timeout
    +           // we check availability again to catch cases where the timeout 
expired while waiting
    +           // to re-acquire the lock
    +           if (timeLeft <= 0 && !hasAvailability(totalLimit, outputLimit, 
inputLimit)) {
    +                   throw new IOException(String.format(
    +                                   "Timeout while waiting for an available 
stream/connect. " +
    +                                   "limits: total=%d, input=%d, output=%d 
; Open: input=%d, output=%d ; timeout: %d ms",
    +                                   maxNumOpenStreamsTotal, 
maxNumOpenInputStreams, maxNumOpenOutputStreams,
    +                                   numReservedInputStreams, 
numReservedOutputStreams, getStreamOpenTimeout()));
    +           }
    +   }
    +
    +   @GuardedBy("lock")
    +   private boolean hasAvailability(int totalLimit, int outputLimit, int 
inputLimit) {
    +           return numReservedOutputStreams < outputLimit &&
    +                           numReservedInputStreams < inputLimit &&
    +                           numReservedOutputStreams + 
numReservedInputStreams < totalLimit;
    +   }
    +
    +   @GuardedBy("lock")
    +   private boolean closeInactiveStream(HashSet<? extends 
StreamWithTimeout> streams, long nowNanos) {
    +           for (StreamWithTimeout stream : streams) {
    +                   try {
    +                           // If the stream is closed already, it will be 
removed anyways, so we
    +                           // do not classify it as inactive. We also skip 
the check if another check happened too recently.
    +                           if (stream.isClosed() || nowNanos < 
stream.getLastCheckTimestampNanos() + streamInactivityTimeoutNanos) {
    +                                   // interval since last check not yet 
over
    +                                   return false;
    +                           }
    +                           else if 
(!stream.checkNewBytesAndMark(nowNanos)) {
    +                                   stream.closeDueToTimeout();
    +                                   return true;
    +                           }
    +                   }
    +                   catch (StreamTimeoutException ignored) {
    +                           // may happen due to races
    +                   }
    +                   catch (IOException e) {
    +                           // only log on debug level here, to avoid log 
spamming
    +                           LOG.debug("Could not check for stream progress 
to determine inactivity", e);
    +                   }
    +           }
    +
    +           return false;
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Atomically removes the given output stream from the set of currently 
open output streams,
    +    * and signals that new stream can now be opened.
    +    */
    +   void unregisterOutputStream(OutStream stream) {
    +           lock.lock();
    +           try {
    +                   // only decrement if we actually remove the stream
    +                   if (openOutputStreams.remove(stream)) {
    +                           numReservedOutputStreams--;
    +                           available.signalAll();
    +                   }
    +           }
    +           finally {
    +                   lock.unlock();
    +           }
    +   }
    +
    +   /**
    +    * Atomically removes the given input stream from the set of currently 
open input streams,
    +    * and signals that new stream can now be opened.
    +    */
    +   void unregisterInputStream(InStream stream) {
    +           lock.lock();
    +           try {
    +                   // only decrement if we actually remove the stream
    +                   if (openInputStreams.remove(stream)) {
    +                           numReservedInputStreams--;
    +                           available.signalAll();
    +                   }
    +           }
    +           finally {
    +                   lock.unlock();
    +           }
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * A special IOException, indicating a timeout in the data output 
stream.
    +    */
    +   public static final class StreamTimeoutException extends IOException {
    +
    +           private static final long serialVersionUID = 
-8790922066795901928L;
    +
    +           public StreamTimeoutException() {
    +                   super("Stream closed due to inactivity timeout. " +
    +                                   "This is done to prevent inactive 
streams from blocking the full " +
    +                                   "pool of limited connections");
    +           }
    +
    +           public StreamTimeoutException(StreamTimeoutException other) {
    +                   super(other.getMessage(), other);
    +           }
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Interface for streams that can be checked for inactivity.
    +    */
    +   private interface StreamWithTimeout extends Closeable {
    +
    +           /**
    +            * Gets the timestamp when the last inactivity evaluation was 
made.
    +            */
    +           long getLastCheckTimestampNanos();
    +
    +           /**
    +            * Checks whether there were new bytes since the last time this 
method was invoked.
    +            * This also sets the given timestamp, to be read via {@link 
#getLastCheckTimestampNanos()}.
    +            *
    +            * @return True, if there were new bytes, false if not.
    +            */
    +           boolean checkNewBytesAndMark(long timestamp) throws IOException;
    +
    +           /**
    +            * Closes the stream asynchronously with a special exception 
that indicates closing due
    +            * to lack of progress.
    +            */
    +           void closeDueToTimeout() throws IOException;
    +
    +           /**
    +            * Checks whether the stream was closed already.
    +            */
    +           boolean isClosed();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * A data output stream that wraps a given data output stream and 
un-registers
    +    * from a given connection-limiting file system
    +    * (via {@link 
LimitedConnectionsFileSystem#unregisterOutputStream(OutStream)}
    +    * upon closing.
    +    */
    +   private static final class OutStream extends FSDataOutputStream 
implements StreamWithTimeout {
    +
    +           /** The original data output stream to write to. */
    +           private final FSDataOutputStream originalStream;
    +
    +           /** The connection-limiting file system to un-register from. */
    +           private final LimitedConnectionsFileSystem fs;
    +
    +           /** An exception with which the stream has been externally 
closed. */
    +           private volatile StreamTimeoutException timeoutException;
    +
    +           /** The number of bytes written the last time that the {@link 
#checkNewBytesAndMark(long)}
    +            * method was called. It is important to initialize this with 
{@code -1} so that the
    +            * first check (0 bytes) always appears to have made progress. 
*/
    +           private volatile long lastCheckBytes = -1;
    +
    +           /** The timestamp when the last inactivity evaluation was made. 
*/
    +           private volatile long lastCheckTimestampNanos;
    +
    +           /** Flag tracking whether the stream was already closed, for 
proper inactivity tracking. */
    +           private AtomicBoolean closed = new AtomicBoolean();
    +
    +           OutStream(
    +                           FSDataOutputStream originalStream,
    +                           LimitedConnectionsFileSystem fs) {
    +
    +                   this.originalStream = checkNotNull(originalStream);
    +                   this.fs = checkNotNull(fs);
    +           }
    +
    +           // --- FSDataOutputStream API implementation
    +
    +           @Override
    +           public void write(int b) throws IOException {
    +                   try {
    +                           originalStream.write(b);
    +                   }
    +                   catch (IOException e) {
    +                           handleIOException(e);
    +                   }
    +           }
    +
    +           @Override
    +           public void write(byte[] b, int off, int len) throws 
IOException {
    +                   try {
    +                           originalStream.write(b, off, len);
    +                   }
    +                   catch (IOException e) {
    +                           handleIOException(e);
    +                   }
    +           }
    +
    +           @Override
    +           public long getPos() throws IOException {
    +                   try {
    +                           return originalStream.getPos();
    +                   }
    +                   catch (IOException e) {
    +                           handleIOException(e);
    +                           return -1; // silence the compiler
    +                   }
    +           }
    +
    +           @Override
    +           public void flush() throws IOException {
    +                   try {
    +                           originalStream.flush();
    +                   }
    +                   catch (IOException e) {
    +                           handleIOException(e);
    +                   }
    +           }
    +
    +           @Override
    +           public void sync() throws IOException {
    +                   try {
    +                           originalStream.sync();
    +                   }
    +                   catch (IOException e) {
    +                           handleIOException(e);
    +                   }
    +           }
    +
    +           @Override
    +           public void close() throws IOException {
    +                   if (closed.compareAndSet(false, true)) {
    +                           try {
    +                                   originalStream.close();
    +                           }
    +                           catch (IOException e) {
    +                                   handleIOException(e);
    +                           }
    +                           finally {
    +                                   fs.unregisterOutputStream(this);
    +                           }
    +                   }
    +           }
    +
    +           @Override
    +           public void closeDueToTimeout() throws IOException {
    +                   this.timeoutException = new StreamTimeoutException();
    +                   close();
    +           }
    +
    +           @Override
    +           public boolean isClosed() {
    +                   return closed.get();
    +           }
    +
    +           @Override
    +           public long getLastCheckTimestampNanos() {
    +                   return lastCheckTimestampNanos;
    +           }
    +
    +           @Override
    +           public boolean checkNewBytesAndMark(long timestamp) throws 
IOException {
    +                   // remember the time when checked
    +                   lastCheckTimestampNanos = timestamp;
    +
    +                   final long bytesNow = originalStream.getPos();
    +                   if (bytesNow > lastCheckBytes) {
    +                           lastCheckBytes = bytesNow;
    +                           return true;
    +                   }
    +                   else {
    +                           return false;
    +                   }
    +           }
    +
    +           private void handleIOException(IOException exception) throws 
IOException {
    +                   if (timeoutException == null) {
    +                           throw exception;
    +                   } else {
    +                           // throw a new exception to capture this call's 
stack trace
    +                           throw new 
StreamTimeoutException(timeoutException);
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * A data input stream that wraps a given data input stream and 
un-registers
    +    * from a given connection-limiting file system
    +    * (via {@link 
LimitedConnectionsFileSystem#unregisterInputStream(InStream)}
    +    * upon closing.
    +    */
    +   private static final class InStream extends FSDataInputStream 
implements StreamWithTimeout {
    +
    +           /** The original data input stream to read from. */
    +           private final FSDataInputStream originalStream;
    +
    +           /** The connection-limiting file system to un-register from. */
    +           private final LimitedConnectionsFileSystem fs;
    +
    +           /** An exception with which the stream has been externally 
closed. */
    +           private volatile StreamTimeoutException timeoutException;
    +
    +           /** The number of bytes written the last time that the {@link 
#checkNewBytesAndMark(long)}
    +            * method was called. It is important to initialize this with 
{@code -1} so that the
    +            * first check (0 bytes) always appears to have made progress. 
*/
    +           private volatile long lastCheckBytes = -1;
    +
    +           /** The timestamp when the last inactivity evaluation was made. 
*/
    +           private volatile long lastCheckTimestampNanos;
    +
    +           /** Flag tracking whether the stream was already closed, for 
proper inactivity tracking. */
    +           private AtomicBoolean closed = new AtomicBoolean();
    +
    +           InStream(
    +                           FSDataInputStream originalStream,
    +                           LimitedConnectionsFileSystem fs) {
    +
    +                   this.originalStream = checkNotNull(originalStream);
    +                   this.fs = checkNotNull(fs);
    +           }
    +
    +           // --- FSDataOutputStream API implementation
    +
    +           @Override
    +           public int read() throws IOException {
    +                   try {
    +                           return originalStream.read();
    +                   }
    +                   catch (IOException e) {
    +                           handleIOException(e);
    +                           return 0; // silence the compiler
    +                   }
    +           }
    +
    +           @Override
    +           public int read(byte[] b) throws IOException {
    +                   try {
    +                           return originalStream.read(b);
    +                   }
    +                   catch (IOException e) {
    +                           handleIOException(e);
    +                           return 0; // silence the compiler
    +                   }
    +           }
    +
    +           @Override
    +           public int read(byte[] b, int off, int len) throws IOException {
    +                   try {
    +                           return originalStream.read(b, off, len);
    +                   }
    +                   catch (IOException e) {
    +                           handleIOException(e);
    +                           return 0; // silence the compiler
    +                   }
    +           }
    +
    +           @Override
    +           public long skip(long n) throws IOException {
    +                   try {
    +                           return originalStream.skip(n);
    +                   }
    +                   catch (IOException e) {
    +                           handleIOException(e);
    +                           return 0L; // silence the compiler
    +                   }
    +           }
    +
    +           @Override
    +           public int available() throws IOException {
    +                   try {
    +                           return originalStream.available();
    +                   }
    +                   catch (IOException e) {
    +                           handleIOException(e);
    +                           return 0; // silence the compiler
    +                   }
    +           }
    +
    +           @Override
    +           public void mark(int readlimit) {
    +                   originalStream.mark(readlimit);
    +           }
    +
    +           @Override
    +           public void reset() throws IOException {
    +                   originalStream.reset();
    --- End diff --
    
    Nope, that is simply an oversight. Will fix that...


---

Reply via email to