[
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633115#comment-15633115
]
ASF GitHub Bot commented on FLINK-4391:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2629#discussion_r86200419
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import
org.apache.flink.streaming.api.transformations.OneInputTransformation;
+
+/**
+ * A helper class to apply {@link AsyncFunction} to a data stream.
+ * <p>
+ * <pre>{@code
+ * DataStream<String> input = ...
+ * AsyncFunction<String, Tuple<String, String>> asyncFunc = ...
+ *
+ * AsyncDataStream.orderedWait(input, asyncFunc, 100);
+ * }
+ * </pre>
+ */
+public class AsyncDataStream {
+ public enum OutputMode { ORDERED, UNORDERED }
+
+ private static final int DEFAULT_BUFFER_SIZE = 100;
+
+ private static <IN, OUT> SingleOutputStreamOperator<OUT>
addOperator(DataStream<IN> in,
+
AsyncFunction<IN, OUT> func,
+
int bufSize,
OutputMode mode) {
--- End diff --
I know that we don't have a strict code style in Flink, but could we maybe
put every parameter double indented in a single line when breaking a parameter
list? So that could be
```
void foobar(
int a,
float b) {
barfoo();
}
```
> Provide support for asynchronous operations over streams
> --------------------------------------------------------
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API
> Reporter: Jamie Grier
> Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a
> DataStream. The classic example would be joining against an external
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the
> Flink API. Ideally this could simply take the form of a new operator that
> manages async operations, keeps so many of them in flight, and then emits
> results to downstream operators as the async operations complete.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)