Repository: flink
Updated Branches:
  refs/heads/master 0cb7706da -> 95eadfe15


[FLINK-9755][network] forward exceptions in 
RemoteInputChannel#notifyBufferAvailable() to the responsible thread

This mainly involves state checks but previously these have only been swallowed
without re-registration or any other logging/handling. This may have lead to
some thread stalling while waiting for the notification that never came.

This closes #6272.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5857f554
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5857f554
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5857f554

Branch: refs/heads/master
Commit: 5857f5543a7d9d3082d2f74342758d5a452a3c13
Parents: 0cb7706
Author: Nico Kruber <[email protected]>
Authored: Thu Jul 5 00:48:33 2018 +0200
Committer: Nico Kruber <[email protected]>
Committed: Thu Jul 19 17:01:19 2018 +0200

----------------------------------------------------------------------
 .../io/network/buffer/BufferListener.java       |  9 ++
 .../io/network/buffer/LocalBufferPool.java      | 36 ++------
 .../partition/consumer/RemoteInputChannel.java  | 52 ++++++-----
 .../consumer/RemoteInputChannelTest.java        | 90 ++++++++++++++++++--
 4 files changed, 131 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5857f554/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
index 05b4156..4cc32c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
@@ -27,6 +27,15 @@ public interface BufferListener {
        /**
         * Notification callback if a buffer is recycled and becomes available 
in buffer pool.
         *
+        * <p>Note: responsibility on recycling the given buffer is transferred 
to this implementation,
+        * including any errors that lead to exceptions being thrown!
+        *
+        * <p><strong>BEWARE:</strong> since this may be called from outside 
the thread that relies on
+        * the listener's logic, any exception that occurs with this handler 
should be forwarded to the
+        * responsible thread for handling and otherwise ignored in the 
processing of this method. The
+        * buffer pool forwards any {@link Throwable} from here upwards to a 
potentially unrelated call
+        * stack!
+        *
         * @param buffer buffer that becomes available in buffer pool.
         * @return true if the listener wants to be notified next time.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/5857f554/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index e874723..1596fde 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -262,8 +262,7 @@ class LocalBufferPool implements BufferPool {
                        if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
                                returnMemorySegment(segment);
                                return;
-                       }
-                       else {
+                       } else {
                                listener = registeredListeners.poll();
 
                                if (listener == null) {
@@ -277,37 +276,18 @@ class LocalBufferPool implements BufferPool {
                // We do not know which locks have been acquired before the 
recycle() or are needed in the
                // notification and which other threads also access them.
                // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
-               boolean success = false;
-               boolean needMoreBuffers = false;
-               try {
-                       needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
-                       success = true;
-               } catch (Throwable ignored) {
-                       // handled below, under the lock
-               }
+               // Note that in case of any exceptions notifyBufferAvailable() 
should recycle the buffer
+               // (either directly or later during error handling) and 
therefore eventually end up in this
+               // method again.
+               boolean needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
 
-               if (!success || needMoreBuffers) {
+               if (needMoreBuffers) {
                        synchronized (availableMemorySegments) {
                                if (isDestroyed) {
                                        // cleanup tasks how they would have 
been done if we only had one synchronized block
-                                       if (needMoreBuffers) {
-                                               
listener.notifyBufferDestroyed();
-                                       }
-                                       if (!success) {
-                                               returnMemorySegment(segment);
-                                       }
+                                       listener.notifyBufferDestroyed();
                                } else {
-                                       if (needMoreBuffers) {
-                                               
registeredListeners.add(listener);
-                                       }
-                                       if (!success) {
-                                               if 
(numberOfRequestedMemorySegments > currentPoolSize) {
-                                                       
returnMemorySegment(segment);
-                                               } else {
-                                                       
availableMemorySegments.add(segment);
-                                                       
availableMemorySegments.notify();
-                                               }
-                                       }
+                                       registeredListeners.add(listener);
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/5857f554/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 0f70d44..b94f48a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -360,32 +360,44 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
                        return false;
                }
 
-               boolean needMoreBuffers = false;
-               synchronized (bufferQueue) {
-                       checkState(isWaitingForFloatingBuffers, "This channel 
should be waiting for floating buffers.");
+               boolean recycleBuffer = true;
+               try {
+                       boolean needMoreBuffers = false;
+                       synchronized (bufferQueue) {
+                               checkState(isWaitingForFloatingBuffers,
+                                       "This channel should be waiting for 
floating buffers.");
+
+                               // Important: double check the isReleased state 
inside synchronized block, so there is no
+                               // race condition when notifyBufferAvailable 
and releaseAllResources running in parallel.
+                               if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
+                                       isWaitingForFloatingBuffers = false;
+                                       recycleBuffer = false; // just in case
+                                       buffer.recycleBuffer();
+                                       return false;
+                               }
 
-                       // Important: double check the isReleased state inside 
synchronized block, so there is no
-                       // race condition when notifyBufferAvailable and 
releaseAllResources running in parallel.
-                       if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
-                               isWaitingForFloatingBuffers = false;
-                               buffer.recycleBuffer();
-                               return false;
-                       }
+                               recycleBuffer = false;
+                               bufferQueue.addFloatingBuffer(buffer);
 
-                       bufferQueue.addFloatingBuffer(buffer);
+                               if (bufferQueue.getAvailableBufferSize() == 
numRequiredBuffers) {
+                                       isWaitingForFloatingBuffers = false;
+                               } else {
+                                       needMoreBuffers = true;
+                               }
 
-                       if (bufferQueue.getAvailableBufferSize() == 
numRequiredBuffers) {
-                               isWaitingForFloatingBuffers = false;
-                       } else {
-                               needMoreBuffers =  true;
+                               if (unannouncedCredit.getAndAdd(1) == 0) {
+                                       notifyCreditAvailable();
+                               }
                        }
-               }
 
-               if (unannouncedCredit.getAndAdd(1) == 0) {
-                       notifyCreditAvailable();
+                       return needMoreBuffers;
+               } catch (Throwable t) {
+                       if (recycleBuffer) {
+                               buffer.recycleBuffer();
+                       }
+                       setError(t);
+                       return false;
                }
-
-               return needMoreBuffers;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5857f554/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 6c6fd96..6305492 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -52,9 +52,13 @@ import java.util.concurrent.Future;
 
 import scala.Tuple2;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -452,7 +456,7 @@ public class RemoteInputChannelTest {
                } catch (Throwable t) {
                        thrown = t;
                } finally {
-                       cleanup(networkBufferPool, null, thrown, inputChannel);
+                       cleanup(networkBufferPool, null, null, thrown, 
inputChannel);
                }
        }
 
@@ -528,7 +532,7 @@ public class RemoteInputChannelTest {
                } catch (Throwable t) {
                        thrown = t;
                } finally {
-                       cleanup(networkBufferPool, null, thrown, inputChannel);
+                       cleanup(networkBufferPool, null, null, thrown, 
inputChannel);
                }
        }
 
@@ -618,7 +622,7 @@ public class RemoteInputChannelTest {
                } catch (Throwable t) {
                        thrown = t;
                } finally {
-                       cleanup(networkBufferPool, null, thrown, inputChannel);
+                       cleanup(networkBufferPool, null, null, thrown, 
inputChannel);
                }
        }
 
@@ -687,7 +691,72 @@ public class RemoteInputChannelTest {
                } catch (Throwable t) {
                        thrown = t;
                } finally {
-                       cleanup(networkBufferPool, null, thrown, channel1, 
channel2, channel3);
+                       cleanup(networkBufferPool, null, null, thrown, 
channel1, channel2, channel3);
+               }
+       }
+
+       /**
+        * Tests that failures are propagated correctly if
+        * {@link RemoteInputChannel#notifyBufferAvailable(Buffer)} throws an 
exception. Also tests that
+        * a second listener will be notified in this case.
+        */
+       @Test
+       public void testFailureInNotifyBufferAvailable() throws Exception {
+               // Setup
+               final int numExclusiveBuffers = 0;
+               final int numFloatingBuffers = 1;
+               final int numTotalBuffers = numExclusiveBuffers + 
numFloatingBuffers;
+               final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(
+                       numTotalBuffers, 32);
+
+               final SingleInputGate inputGate = createSingleInputGate();
+               final RemoteInputChannel successfulRemoteIC = 
createRemoteInputChannel(inputGate);
+               
inputGate.setInputChannel(successfulRemoteIC.partitionId.getPartitionId(), 
successfulRemoteIC);
+
+               successfulRemoteIC.requestSubpartition(0);
+
+               // late creation -> no exclusive buffers, also no requested 
subpartition in successfulRemoteIC
+               // (to trigger a failure in 
RemoteInputChannel#notifyBufferAvailable())
+               final RemoteInputChannel failingRemoteIC = 
createRemoteInputChannel(inputGate);
+               
inputGate.setInputChannel(failingRemoteIC.partitionId.getPartitionId(), 
failingRemoteIC);
+
+               Buffer buffer = null;
+               Throwable thrown = null;
+               try {
+                       final BufferPool bufferPool =
+                               
networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
+                       inputGate.setBufferPool(bufferPool);
+
+                       buffer = bufferPool.requestBufferBlocking();
+
+                       // trigger subscription to buffer pool
+                       failingRemoteIC.onSenderBacklog(1);
+                       successfulRemoteIC.onSenderBacklog(numExclusiveBuffers 
+ 1);
+                       // recycling will call 
RemoteInputChannel#notifyBufferAvailable() which will fail and
+                       // this exception will be swallowed and set as an error 
in failingRemoteIC
+                       buffer.recycleBuffer();
+                       buffer = null;
+                       try {
+                               failingRemoteIC.checkError();
+                               fail("The input channel should have an error 
based on the failure in RemoteInputChannel#notifyBufferAvailable()");
+                       } catch (IOException e) {
+                               assertThat(e, hasProperty("cause", 
isA(IllegalStateException.class)));
+                       }
+                       // currently, the buffer is still enqueued in the 
bufferQueue of failingRemoteIC
+                       assertEquals(0, 
bufferPool.getNumberOfAvailableMemorySegments());
+                       buffer = successfulRemoteIC.requestBuffer();
+                       assertNull("buffer should still remain in 
failingRemoteIC", buffer);
+
+                       // releasing resources in failingRemoteIC should free 
the buffer again and immediately
+                       // recycle it into successfulRemoteIC
+                       failingRemoteIC.releaseAllResources();
+                       assertEquals(0, 
bufferPool.getNumberOfAvailableMemorySegments());
+                       buffer = successfulRemoteIC.requestBuffer();
+                       assertNotNull("no buffer given to successfulRemoteIC", 
buffer);
+               } catch (Throwable t) {
+                       thrown = t;
+               } finally {
+                       cleanup(networkBufferPool, null, buffer, thrown, 
failingRemoteIC, successfulRemoteIC);
                }
        }
 
@@ -749,7 +818,7 @@ public class RemoteInputChannelTest {
                } catch (Throwable t) {
                        thrown = t;
                } finally {
-                       cleanup(networkBufferPool, executor, thrown, 
inputChannel);
+                       cleanup(networkBufferPool, executor, null, thrown, 
inputChannel);
                }
        }
 
@@ -802,7 +871,7 @@ public class RemoteInputChannelTest {
                } catch (Throwable t) {
                        thrown = t;
                } finally {
-                       cleanup(networkBufferPool, executor, thrown, 
inputChannel);
+                       cleanup(networkBufferPool, executor, null, thrown, 
inputChannel);
                }
        }
 
@@ -854,7 +923,7 @@ public class RemoteInputChannelTest {
                } catch (Throwable t) {
                        thrown = t;
                } finally {
-                       cleanup(networkBufferPool, executor, thrown, 
inputChannel);
+                       cleanup(networkBufferPool, executor, null, thrown, 
inputChannel);
                }
        }
 
@@ -936,7 +1005,7 @@ public class RemoteInputChannelTest {
                } catch (Throwable t) {
                        thrown = t;
                } finally {
-                       cleanup(networkBufferPool, executor, thrown, 
inputChannel);
+                       cleanup(networkBufferPool, executor, null, thrown, 
inputChannel);
                }
        }
 
@@ -1064,6 +1133,7 @@ public class RemoteInputChannelTest {
        private void cleanup(
                        NetworkBufferPool networkBufferPool,
                        @Nullable ExecutorService executor,
+                       @Nullable Buffer buffer,
                        @Nullable Throwable throwable,
                        InputChannel... inputChannels) throws Exception {
                for (InputChannel inputChannel : inputChannels) {
@@ -1074,6 +1144,10 @@ public class RemoteInputChannelTest {
                        }
                }
 
+               if (buffer != null && !buffer.isRecycled()) {
+                       buffer.recycleBuffer();
+               }
+
                try {
                        networkBufferPool.destroyAllBufferPools();
                } catch (Throwable tInner) {

Reply via email to