kl0u commented on a change in pull request #9321: [FLINK-13486][tests] Optimize 
AsyncDataStreamITCase to alleviate the …
URL: https://github.com/apache/flink/pull/9321#discussion_r310525979
 
 

 ##########
 File path: 
flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
 ##########
 @@ -137,39 +123,23 @@ class AsyncDataStreamITCase extends AbstractTestBase {
 
 }
 
-class MyAsyncFunction extends AsyncFunction[Int, Int] {
-  override def asyncInvoke(input: Int, resultFuture: ResultFuture[Int]): Unit 
= {
-    Future {
-      // trigger the timeout of the even input number
-      if (input % 2 == 0) {
-        Thread.sleep(AsyncDataStreamITCase.timeout + 1000)
-      }
-
-      resultFuture.complete(Seq(input * 2))
-    } (ExecutionContext.global)
-  }
-  override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
-    resultFuture.complete(Seq(input * 3))
-  }
-}
+class AsyncFunctionWithTimeoutExpired extends RichAsyncFunction[Int, Int] {
 
-class MyRichAsyncFunction extends RichAsyncFunction[Int, Int] {
+  @transient var latch: CountDownLatch = _
 
   override def open(parameters: Configuration): Unit = {
     assertEquals(getRuntimeContext.getNumberOfParallelSubtasks, 1)
+    latch = new CountDownLatch(1)
   }
 
   override def asyncInvoke(input: Int, resultFuture: ResultFuture[Int]): Unit 
= {
     Future {
-      // trigger the timeout of the even input number
-      if (input % 2 == 0) {
-        Thread.sleep(AsyncDataStreamITCase.timeout + 1000)
-      }
-
+      latch.await()
       resultFuture.complete(Seq(input * 2))
     } (ExecutionContext.global)
   }
   override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
+    latch.countDown()
 
 Review comment:
   Here you have another race condition because after releasing the lock, you 
do not know which of the two threads ("this" or the "waiting" one that is not 
triggered) is going to complete the future. Please move the 
`resultFuture.complete(Seq(input * 3))` before the `latch.countDown()`.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to