Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6021#discussion_r197070282
--- Diff:
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
---
@@ -267,6 +268,79 @@ public void go() throws Exception {
testHarness.close();
}
+ /**
+ * Test ensuring that the producer blocks if the queue limit is
exceeded,
+ * until the queue length drops below the limit;
+ * we set a timeout because the test will not finish if the logic is
broken.
+ */
+ @Test(timeout = 10000)
+ public void testBackpressure() throws Throwable {
+ final DummyFlinkKinesisProducer<String> producer = new
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+ producer.setQueueLimit(1);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(producer));
+
+ testHarness.open();
+
+ UserRecordResult result = mock(UserRecordResult.class);
+ when(result.isSuccessful()).thenReturn(true);
+
+ CheckedThread msg1 = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ testHarness.processElement(new
StreamRecord<>("msg-1"));
+ }
+ };
+ msg1.start();
+ msg1.trySync(100);
+ assertFalse("Flush triggered before reaching queue limit",
msg1.isAlive());
--- End diff --
I wonder if this would introduce flakiness in the test.
@fmthoma could you elaborate a bit here?
---