kl0u commented on a change in pull request #9321: [FLINK-13486][tests] Optimize
AsyncDataStreamITCase to alleviate the …
URL: https://github.com/apache/flink/pull/9321#discussion_r310530519
##########
File path:
flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
##########
@@ -33,37 +32,39 @@ import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
object AsyncDataStreamITCase {
- val timeout = 1000L
+ val timeout = 10000L
+ val fastTimeout = 1L
private var testResult: mutable.ArrayBuffer[Int] = _
}
class AsyncDataStreamITCase extends AbstractTestBase {
@Test
- def testOrderedWait(): Unit = {
- testAsyncWait(true)
+ def testOrderedWaitTimeout(): Unit = {
+ testAsyncWaitTimeout(true)
}
@Test
- def testUnorderedWait(): Unit = {
- testAsyncWait(false)
+ def testUnorderedWaitTimeout(): Unit = {
+ testAsyncWaitTimeout(false)
}
- private def testAsyncWait(ordered: Boolean): Unit = {
+ private def testAsyncWaitTimeout(ordered: Boolean): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val source = env.fromElements(1, 2)
val asyncMapped = if (ordered) {
AsyncDataStream.orderedWait(
- source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
+ source, new AsyncFunctionWithTimeoutExpired(), fastTimeout,
TimeUnit.MILLISECONDS)
} else {
AsyncDataStream.unorderedWait(
- source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
+ source, new AsyncFunctionWithTimeoutExpired(), fastTimeout,
TimeUnit.MILLISECONDS)
}
- executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2,
6))
+ // the latch only works once, so the second element would multiply 2
Review comment:
Here you have a race condition. You assume that the order of calls will be:
1) receive 1st element
2) call `timeout()` on the 1st element.
3) receive 2nd element
This may not be the case as you may have the following ordering given that
the `timeout()` method is called by another thread:
1) receive 1st element
2) receive 2nd element
3) call `timeout()` on the 1st element.
This may lead to different results and actually if you run the test in a
loop, you will see it is still flaky.
What you could do is have two latches, one as it is now, and the other
waiting in the `asyncInvoke` when it sees the **second** element. This second
latch will be triggered by the `timeout` of the first element. I think that
this will guarantee the ordering that you want.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services