[
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633122#comment-15633122
]
ASF GitHub Bot commented on FLINK-4391:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2629#discussion_r86350872
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
---
@@ -195,6 +202,70 @@ public Integer map(NonSerializable value) throws
Exception {
env.execute();
}
+ @Test
+ public void testAsyncWaitOperator() throws Exception {
+ final int numElements = 10;
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<Integer, NonSerializable>> input =
env.addSource(new NonSerializableTupleSource(numElements)).setParallelism(1);
+
+ AsyncFunction<Tuple2<Integer, NonSerializable>, Integer>
function = new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>() {
+ transient ExecutorService executorService;
+
+ @Override
+ public void open(Configuration parameters) throws
Exception {
+ super.open(parameters);
+ executorService =
Executors.newFixedThreadPool(numElements);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ executorService.shutdown();
+ }
+
+ @Override
+ public void asyncInvoke(final Tuple2<Integer,
NonSerializable> input,
+ final
AsyncCollector<Tuple2<Integer, NonSerializable>, Integer> collector) throws
Exception {
+ this.executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ // wait for while to simulate
async operation here
+ int sleep = (int) (new
Random().nextFloat() * 1000);
+ try {
+ Thread.sleep(sleep);
+ List<Integer> ret = new
ArrayList<>();
+
ret.add(input.f0+input.f0);
+ collector.collect(ret);
+ }
+ catch (InterruptedException e) {
+ collector.collect(new
ArrayList<Integer>(0));
+ }
+ }
+ });
+ }
+ };
+
+ DataStream<Integer> orderedResult =
AsyncDataStream.orderedWait(input, function, 2).setParallelism(1);
+ orderedResult.writeAsText(resultPath1,
FileSystem.WriteMode.OVERWRITE).setParallelism(1);
+
+ DataStream<Integer> unorderedResult =
AsyncDataStream.unorderedWait(input, function, 2).setParallelism(1);
+ unorderedResult.writeAsText(resultPath2,
FileSystem.WriteMode.OVERWRITE);
--- End diff --
It is better to not write things on disk because this makes the test case
more reliable against disk problems. Better to use a sink which keeps the state
in memory.
> Provide support for asynchronous operations over streams
> --------------------------------------------------------
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API
> Reporter: Jamie Grier
> Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a
> DataStream. The classic example would be joining against an external
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the
> Flink API. Ideally this could simply take the form of a new operator that
> manages async operations, keeps so many of them in flight, and then emits
> results to downstream operators as the async operations complete.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)