[
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897029#comment-15897029
]
ASF GitHub Bot commented on FLINK-4460:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/3438#discussion_r104377167
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
---
@@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() {
}
/**
+ * Applies the given {@link ProcessFunction} on the input stream,
thereby
+ * creating a transformed output stream.
+ *
+ * <p>The function will be called for every element in the input
streams and can produce zero
+ * or more output elements.
+ *
+ * @param processFunction The {@link ProcessFunction} that is called
for each element
+ * in the stream.
+ *
+ * @param <R> The type of elements emitted by the {@code
ProcessFunction}.
+ *
+ * @return The transformed {@link DataStream}.
+ */
+ @PublicEvolving
+ public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R>
processFunction) {
+
+ TypeInformation<R> outType =
TypeExtractor.getUnaryOperatorReturnType(
+ processFunction,
+ ProcessFunction.class,
+ false,
+ true,
+ getType(),
+ Utils.getCallLocationName(),
+ true);
+
+ return process(processFunction, outType);
+ }
+
+ /**
+ * Applies the given {@link ProcessFunction} on the input stream,
thereby
+ * creating a transformed output stream.
+ *
+ * <p>The function will be called for every element in the input
streams and can produce zero
+ * or more output elements.
+ *
+ * @param processFunction The {@link ProcessFunction} that is called
for each element
+ * in the stream.
+ * @param outputType {@link TypeInformation} for the result type of the
function.
+ *
+ * @param <R> The type of elements emitted by the {@code
ProcessFunction}.
+ *
+ * @return The transformed {@link DataStream}.
+ */
+ @Internal
+ public <R> SingleOutputStreamOperator<R> process(
--- End diff --
Is this internal method only exposed as `public` for the Scala API? If yes,
I'm wondering if it makes sense to call `transform` manually in the Scala
`DataStream` API.
> Side Outputs in Flink
> ---------------------
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
> Issue Type: New Feature
> Components: Core, DataStream API
> Affects Versions: 1.2.0, 1.1.3
> Reporter: Chen Qin
> Assignee: Chen Qin
> Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)