This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new f77f4c2 [FLINK-13486][tests] Harden AsyncDataStreamITCase to
alleviate race condition
f77f4c2 is described below
commit f77f4c20ae77027c32ffb70ba01a08b16e36cdcb
Author: ifndef-SleePy <[email protected]>
AuthorDate: Mon Aug 5 22:45:53 2019 +0800
[FLINK-13486][tests] Harden AsyncDataStreamITCase to alleviate race
condition
---
.../api/scala/AsyncDataStreamITCase.scala | 46 ++++++++++++++--------
1 file changed, 29 insertions(+), 17 deletions(-)
diff --git
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
index 653c982..8d53b4d 100644
---
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
+++
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
@@ -18,13 +18,12 @@
package org.apache.flink.streaming.api.scala
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.flink.configuration.Configuration
-import
org.apache.flink.streaming.api.functions.async.RichAsyncFunction.{RichAsyncFunctionIterationRuntimeContext,
RichAsyncFunctionRuntimeContext}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._
-import org.apache.flink.streaming.api.scala.async.{AsyncFunction,
ResultFuture, RichAsyncFunction}
+import org.apache.flink.streaming.api.scala.async.{ResultFuture,
RichAsyncFunction}
import org.apache.flink.test.util.AbstractTestBase
import org.junit.Assert._
import org.junit.Test
@@ -33,7 +32,6 @@ import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
object AsyncDataStreamITCase {
- val timeout = 1000L
private var testResult: mutable.ArrayBuffer[Int] = _
}
@@ -55,12 +53,13 @@ class AsyncDataStreamITCase extends AbstractTestBase {
val source = env.fromElements(1, 2)
+ val timeout = 1L
val asyncMapped = if (ordered) {
AsyncDataStream.orderedWait(
- source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
+ source, new AsyncFunctionWithTimeoutExpired(), timeout,
TimeUnit.MILLISECONDS)
} else {
AsyncDataStream.unorderedWait(
- source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
+ source, new AsyncFunctionWithTimeoutExpired(), timeout,
TimeUnit.MILLISECONDS)
}
executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2,
6))
@@ -104,7 +103,7 @@ class AsyncDataStreamITCase extends AbstractTestBase {
val source = env.fromElements(1)
-
+ val timeout = 10000L
val richAsyncFunction = new MyRichAsyncFunction
val asyncMapped = AsyncDataStream
.unorderedWait(source, richAsyncFunction, timeout, TimeUnit.MILLISECONDS)
@@ -122,6 +121,8 @@ class AsyncDataStreamITCase extends AbstractTestBase {
(input, collector: ResultFuture[Int]) => Future {
collector.complete(Seq(input * 2))
}(ExecutionContext.global)
+
+ val timeout = 10000L
val asyncMapped = if (ordered) {
AsyncDataStream.orderedWait(source, timeout, TimeUnit.MILLISECONDS) {
asyncFunction
@@ -137,19 +138,35 @@ class AsyncDataStreamITCase extends AbstractTestBase {
}
-class MyAsyncFunction extends AsyncFunction[Int, Int] {
+class AsyncFunctionWithTimeoutExpired extends RichAsyncFunction[Int, Int] {
+ @transient var timeoutLatch: CountDownLatch = _
+ @transient var invokeLatch: CountDownLatch = _
+
+ override def open(parameters: Configuration): Unit = {
+ timeoutLatch = new CountDownLatch(1)
+ invokeLatch = new CountDownLatch(1)
+ }
+
override def asyncInvoke(input: Int, resultFuture: ResultFuture[Int]): Unit
= {
Future {
// trigger the timeout of the even input number
if (input % 2 == 0) {
- Thread.sleep(AsyncDataStreamITCase.timeout + 1000)
+ invokeLatch.await()
+ resultFuture.complete(Seq(input * 2))
+ } else {
+ resultFuture.complete(Seq(input * 2))
+ timeoutLatch.countDown()
}
-
- resultFuture.complete(Seq(input * 2))
} (ExecutionContext.global)
}
override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
- resultFuture.complete(Seq(input * 3))
+ if (input % 2 == 0) {
+ resultFuture.complete(Seq(input * 3))
+ invokeLatch.countDown()
+ } else {
+ timeoutLatch.await()
+ resultFuture.complete(Seq(input * 3))
+ }
}
}
@@ -161,11 +178,6 @@ class MyRichAsyncFunction extends RichAsyncFunction[Int,
Int] {
override def asyncInvoke(input: Int, resultFuture: ResultFuture[Int]): Unit
= {
Future {
- // trigger the timeout of the even input number
- if (input % 2 == 0) {
- Thread.sleep(AsyncDataStreamITCase.timeout + 1000)
- }
-
resultFuture.complete(Seq(input * 2))
} (ExecutionContext.global)
}