Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197137205
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
 ---
    @@ -267,6 +268,79 @@ public void go() throws Exception {
                testHarness.close();
        }
     
    +   /**
    +    * Test ensuring that the producer blocks if the queue limit is 
exceeded,
    +    * until the queue length drops below the limit;
    +    * we set a timeout because the test will not finish if the logic is 
broken.
    +    */
    +   @Test(timeout = 10000)
    +   public void testBackpressure() throws Throwable {
    +           final DummyFlinkKinesisProducer<String> producer = new 
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
    +           producer.setQueueLimit(1);
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                           new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           UserRecordResult result = mock(UserRecordResult.class);
    +           when(result.isSuccessful()).thenReturn(true);
    +
    +           CheckedThread msg1 = new CheckedThread() {
    +                   @Override
    +                   public void go() throws Exception {
    +                           testHarness.processElement(new 
StreamRecord<>("msg-1"));
    +                   }
    +           };
    +           msg1.start();
    +           msg1.trySync(100);
    +           assertFalse("Flush triggered before reaching queue limit", 
msg1.isAlive());
    --- End diff --
    
    @tzulitai In principle, yes, if the call `testHarness.processElement(…)` 
takes more than 100 milliseconds. However, I believe this is very unlikely even 
on slow systems, since the operation is mostly (entirely?) CPU bound. If test 
failures occur nevertheless, it should be no problem to increase the timeout 
for `msg1` and `msg2`.


---

Reply via email to