Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4499#discussion_r140833417
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
---
@@ -281,6 +286,73 @@ public void testProducerFailedException() throws
Exception {
ch.getNextBuffer();
}
+ /**
+ * Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying
the exclusive segment is
+ * recycled to available buffers directly and it triggers notify of
announced credit.
+ */
+ @Test
+ public void testRecycleExclusiveBufferBeforeReleased() throws Exception
{
+ final SingleInputGate inputGate = mock(SingleInputGate.class);
+ final RemoteInputChannel inputChannel =
spy(createRemoteInputChannel(inputGate));
+
+ // Recycle exclusive segment
+
inputChannel.recycle(MemorySegmentFactory.getFactory().allocateUnpooledSegment(1024,
inputChannel));
+
+ assertEquals("There should have one available buffer after
recycle.",
+ 1, inputChannel.getNumberOfAvailableBuffers());
+ verify(inputChannel, times(1)).notifyCreditAvailable();
--- End diff --
can you add one more
`inputChannel.recycle(MemorySegmentFactory.getFactory().allocateUnpooledSegment(1024,
inputChannel))` call and verify `inputChannel.getNumberOfAvailableBuffers()`
and that `notifyCreditAvailable()` is not called again?
---