Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4533#discussion_r144025139
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
---
@@ -206,6 +209,53 @@ public void testCancelBeforeActive() throws Exception {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
+ /**
+ * Verifies that {@link RemoteInputChannel} is enqueued in the
pipeline, and
+ * {@link AddCredit} message is sent to the producer.
+ */
+ @Test
+ public void testNotifyCreditAvailable() throws Exception {
+ final CreditBasedClientHandler handler = new
CreditBasedClientHandler();
+ final EmbeddedChannel ch = new EmbeddedChannel(handler);
+
+ final RemoteInputChannel inputChannel =
mock(RemoteInputChannel.class);
+
+ // Enqueue the remote input channel
+ handler.notifyCreditAvailable(inputChannel);
+
+ ch.runPendingTasks();
+
+ // Read the enqueued msg
+ Object msg1 = ch.readOutbound();
+
+ // Should notify credit
+ assertEquals(msg1.getClass(), AddCredit.class);
+ }
+
+ /**
+ * Verifies that {@link RemoteInputChannel} is enqueued in the
pipeline, but {@link AddCredit}
+ * message is not sent after the input channel is released.
+ */
+ @Test
+ public void testNotifyCreditAvailableAfterReleased() throws Exception {
+ final CreditBasedClientHandler handler = new
CreditBasedClientHandler();
+ final EmbeddedChannel ch = new EmbeddedChannel(handler);
--- End diff --
`channel`
---