This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new f77f4c2  [FLINK-13486][tests] Harden AsyncDataStreamITCase to 
alleviate race condition
f77f4c2 is described below

commit f77f4c20ae77027c32ffb70ba01a08b16e36cdcb
Author: ifndef-SleePy <[email protected]>
AuthorDate: Mon Aug 5 22:45:53 2019 +0800

    [FLINK-13486][tests] Harden AsyncDataStreamITCase to alleviate race 
condition
---
 .../api/scala/AsyncDataStreamITCase.scala          | 46 ++++++++++++++--------
 1 file changed, 29 insertions(+), 17 deletions(-)

diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
index 653c982..8d53b4d 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
@@ -18,13 +18,12 @@
 
 package org.apache.flink.streaming.api.scala
 
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import org.apache.flink.configuration.Configuration
-import 
org.apache.flink.streaming.api.functions.async.RichAsyncFunction.{RichAsyncFunctionIterationRuntimeContext,
 RichAsyncFunctionRuntimeContext}
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._
-import org.apache.flink.streaming.api.scala.async.{AsyncFunction, 
ResultFuture, RichAsyncFunction}
+import org.apache.flink.streaming.api.scala.async.{ResultFuture, 
RichAsyncFunction}
 import org.apache.flink.test.util.AbstractTestBase
 import org.junit.Assert._
 import org.junit.Test
@@ -33,7 +32,6 @@ import scala.collection.mutable
 import scala.concurrent.{ExecutionContext, Future}
 
 object AsyncDataStreamITCase {
-  val timeout = 1000L
   private var testResult: mutable.ArrayBuffer[Int] = _
 }
 
@@ -55,12 +53,13 @@ class AsyncDataStreamITCase extends AbstractTestBase {
 
     val source = env.fromElements(1, 2)
 
+    val timeout = 1L
     val asyncMapped = if (ordered) {
       AsyncDataStream.orderedWait(
-        source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
+        source, new AsyncFunctionWithTimeoutExpired(), timeout, 
TimeUnit.MILLISECONDS)
     } else {
       AsyncDataStream.unorderedWait(
-        source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
+        source, new AsyncFunctionWithTimeoutExpired(), timeout, 
TimeUnit.MILLISECONDS)
     }
 
     executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 
6))
@@ -104,7 +103,7 @@ class AsyncDataStreamITCase extends AbstractTestBase {
 
     val source = env.fromElements(1)
 
-
+    val timeout = 10000L
     val richAsyncFunction = new MyRichAsyncFunction
     val asyncMapped = AsyncDataStream
       .unorderedWait(source, richAsyncFunction, timeout, TimeUnit.MILLISECONDS)
@@ -122,6 +121,8 @@ class AsyncDataStreamITCase extends AbstractTestBase {
       (input, collector: ResultFuture[Int]) => Future {
           collector.complete(Seq(input * 2))
       }(ExecutionContext.global)
+
+    val timeout = 10000L
     val asyncMapped = if (ordered) {
       AsyncDataStream.orderedWait(source, timeout, TimeUnit.MILLISECONDS) {
         asyncFunction
@@ -137,19 +138,35 @@ class AsyncDataStreamITCase extends AbstractTestBase {
 
 }
 
-class MyAsyncFunction extends AsyncFunction[Int, Int] {
+class AsyncFunctionWithTimeoutExpired extends RichAsyncFunction[Int, Int] {
+  @transient var timeoutLatch: CountDownLatch = _
+  @transient var invokeLatch: CountDownLatch = _
+
+  override def open(parameters: Configuration): Unit = {
+    timeoutLatch = new CountDownLatch(1)
+    invokeLatch = new CountDownLatch(1)
+  }
+
   override def asyncInvoke(input: Int, resultFuture: ResultFuture[Int]): Unit 
= {
     Future {
       // trigger the timeout of the even input number
       if (input % 2 == 0) {
-        Thread.sleep(AsyncDataStreamITCase.timeout + 1000)
+        invokeLatch.await()
+        resultFuture.complete(Seq(input * 2))
+      } else {
+        resultFuture.complete(Seq(input * 2))
+        timeoutLatch.countDown()
       }
-
-      resultFuture.complete(Seq(input * 2))
     } (ExecutionContext.global)
   }
   override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
-    resultFuture.complete(Seq(input * 3))
+    if (input % 2 == 0) {
+      resultFuture.complete(Seq(input * 3))
+      invokeLatch.countDown()
+    } else {
+      timeoutLatch.await()
+      resultFuture.complete(Seq(input * 3))
+    }
   }
 }
 
@@ -161,11 +178,6 @@ class MyRichAsyncFunction extends RichAsyncFunction[Int, 
Int] {
 
   override def asyncInvoke(input: Int, resultFuture: ResultFuture[Int]): Unit 
= {
     Future {
-      // trigger the timeout of the even input number
-      if (input % 2 == 0) {
-        Thread.sleep(AsyncDataStreamITCase.timeout + 1000)
-      }
-
       resultFuture.complete(Seq(input * 2))
     } (ExecutionContext.global)
   }

Reply via email to