Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/6272#discussion_r201247182 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -687,7 +690,65 @@ public void testFairDistributionFloatingBuffers() throws Exception { } catch (Throwable t) { thrown = t; } finally { - cleanup(networkBufferPool, null, thrown, channel1, channel2, channel3); + cleanup(networkBufferPool, null, null, thrown, channel1, channel2, channel3); + } + } + + /** + * Tests that failures are propagated correctly if + * {@link RemoteInputChannel#notifyBufferAvailable(Buffer)} throws an exception. Also tests that + * a second listener will be notified in this case. + */ + @Test + public void testFailureInNotifyBufferAvailable() throws Exception { + // Setup + final int numExclusiveBuffers = 0; + final int numFloatingBuffers = 1; + final int numTotalBuffers = numExclusiveBuffers + numFloatingBuffers; + final NetworkBufferPool networkBufferPool = new NetworkBufferPool( + numTotalBuffers, 32); + + final SingleInputGate inputGate = createSingleInputGate(); + final RemoteInputChannel successfulRemoteIC = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(successfulRemoteIC.partitionId.getPartitionId(), successfulRemoteIC); + +// inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); --- End diff -- remove this annotated code?
---