Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6091#discussion_r192316460 --- Diff: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import java.util.concurrent.TimeUnit + +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._ +import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture} +import org.apache.flink.test.util.AbstractTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} + +object AsyncDataStreamITCase { + val timeout = 1000L + private var testResult: mutable.ArrayBuffer[Int] = _ +} + +class AsyncDataStreamITCase extends AbstractTestBase { + + @Test + def testOrderedWait(): Unit = { + testAsyncWait(true) + } + + @Test + def testUnorderedWait(): Unit = { + testAsyncWait(false) + } + + private def testAsyncWait(ordered: Boolean): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + + val source = env.addSource(new SourceFunction[Int]() { + override def run(ctx: SourceFunction.SourceContext[Int]) { + ctx.collect(1) + ctx.collect(2) + } + override def cancel() {} + }) + + val asyncMapped = if (ordered) { + AsyncDataStream.orderedWait( + source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS) + } else { + AsyncDataStream.unorderedWait( + source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS) + } + + testResult = mutable.ArrayBuffer[Int]() + asyncMapped.addSink(new SinkFunction[Int]() { + override def invoke(value: Int) { + testResult += value + } + }) + + env.execute("testAsyncWait") + + val expectedResult = mutable.ArrayBuffer[Int](2, 6) + if (ordered) { + assertEquals(expectedResult, testResult) + } else { + assertEquals(expectedResult, testResult.sorted) + } + } + + @Test + def testOrderedWaitUsingAnonymousFunction(): Unit = { + testAsyncWaitUsingAnonymousFunction(true) + } + + @Test + def testUnorderedWaitUsingAnonymousFunction(): Unit = { + testAsyncWaitUsingAnonymousFunction(false) + } + + private def testAsyncWaitUsingAnonymousFunction(ordered: Boolean): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + + val source = env.addSource(new SourceFunction[Int]() { --- End diff -- `env.fromElements(1, 2)`
---