Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4533#discussion_r153455238
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
---
@@ -208,8 +244,79 @@ public void testCancelBeforeActive() throws Exception {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
+ /**
+ * Verifies that {@link RemoteInputChannel} is enqueued in the
pipeline, and
+ * {@link AddCredit} message is sent to the producer.
+ */
+ @Test
+ public void testNotifyCreditAvailable() throws Exception {
+ final CreditBasedClientHandler handler = new
CreditBasedClientHandler();
+ final EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+ final RemoteInputChannel inputChannel =
createRemoteInputChannel(mock(SingleInputGate.class));
+
+ // Enqueue the input channel
+ handler.notifyCreditAvailable(inputChannel);
+
+ channel.runPendingTasks();
+
+ // Read the enqueued msg
+ Object msg1 = channel.readOutbound();
+
+ // Should notify credit
+ assertEquals(msg1.getClass(), AddCredit.class);
+ }
+
+ /**
+ * Verifies that {@link RemoteInputChannel} is enqueued in the
pipeline, but {@link AddCredit}
+ * message is not sent actually after this input channel is released.
+ */
+ @Test
+ public void testNotifyCreditAvailableAfterReleased() throws Exception {
+ final CreditBasedClientHandler handler = new
CreditBasedClientHandler();
+ final EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+ final RemoteInputChannel inputChannel =
createRemoteInputChannel(mock(SingleInputGate.class));
+
+ // Enqueue the input channel then release it
+ handler.notifyCreditAvailable(inputChannel);
+ inputChannel.releaseAllResources();
+
+ channel.runPendingTasks();
+
+ // Read the enqueued msg
+ Object msg2 = channel.readOutbound();
+
+ // No need to notify credit for released input channel
+ assertEquals(msg2, null);
+ }
+
//
---------------------------------------------------------------------------------------------
+ private SingleInputGate createSingleInputGate() {
--- End diff --
please add a javadoc about the properties of the created input gate
---