kl0u commented on a change in pull request #9321: [FLINK-13486][tests] Optimize AsyncDataStreamITCase to alleviate the … URL: https://github.com/apache/flink/pull/9321#discussion_r310525979
########## File path: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala ########## @@ -137,39 +123,23 @@ class AsyncDataStreamITCase extends AbstractTestBase { } -class MyAsyncFunction extends AsyncFunction[Int, Int] { - override def asyncInvoke(input: Int, resultFuture: ResultFuture[Int]): Unit = { - Future { - // trigger the timeout of the even input number - if (input % 2 == 0) { - Thread.sleep(AsyncDataStreamITCase.timeout + 1000) - } - - resultFuture.complete(Seq(input * 2)) - } (ExecutionContext.global) - } - override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = { - resultFuture.complete(Seq(input * 3)) - } -} +class AsyncFunctionWithTimeoutExpired extends RichAsyncFunction[Int, Int] { -class MyRichAsyncFunction extends RichAsyncFunction[Int, Int] { + @transient var latch: CountDownLatch = _ override def open(parameters: Configuration): Unit = { assertEquals(getRuntimeContext.getNumberOfParallelSubtasks, 1) + latch = new CountDownLatch(1) } override def asyncInvoke(input: Int, resultFuture: ResultFuture[Int]): Unit = { Future { - // trigger the timeout of the even input number - if (input % 2 == 0) { - Thread.sleep(AsyncDataStreamITCase.timeout + 1000) - } - + latch.await() resultFuture.complete(Seq(input * 2)) } (ExecutionContext.global) } override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = { + latch.countDown() Review comment: Here you have another race condition because after releasing the lock, you do not know which of the two threads ("this" or the "waiting" one that is not triggered) is going to complete the future. Please move the `resultFuture.complete(Seq(input * 3))` before the `latch.countDown()`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services