[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897060#comment-15897060 ]
ASF GitHub Bot commented on FLINK-4460: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104383800 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java --- @@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() { } /** + * Applies the given {@link ProcessFunction} on the input stream, thereby + * creating a transformed output stream. + * + * <p>The function will be called for every element in the input streams and can produce zero + * or more output elements. + * + * @param processFunction The {@link ProcessFunction} that is called for each element + * in the stream. + * + * @param <R> The type of elements emitted by the {@code ProcessFunction}. + * + * @return The transformed {@link DataStream}. + */ + @PublicEvolving + public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) { + + TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType( + processFunction, + ProcessFunction.class, + false, + true, + getType(), + Utils.getCallLocationName(), + true); + + return process(processFunction, outType); + } + + /** + * Applies the given {@link ProcessFunction} on the input stream, thereby + * creating a transformed output stream. + * + * <p>The function will be called for every element in the input streams and can produce zero + * or more output elements. + * + * @param processFunction The {@link ProcessFunction} that is called for each element + * in the stream. + * @param outputType {@link TypeInformation} for the result type of the function. + * + * @param <R> The type of elements emitted by the {@code ProcessFunction}. + * + * @return The transformed {@link DataStream}. + */ + @Internal + public <R> SingleOutputStreamOperator<R> process( --- End diff -- Yes, it's exposed for that. The pattern, so far, is for methods to also expose a public method that takes a `TypeInformation` because we get the `TypeInformation` from the context bound in the Scala API. Calling `transform()` manually is an option but if we do that we would basically not base the Scala API on the Java API anymore and we would have code that instantiates the Stream Operators in both the Java and Scala API. For example, right now we have the code for instantiating a flat map operator in `(Java)DataStream` while `(Scala)DataStream.flatMap()` calls that method. What do you think? > Side Outputs in Flink > --------------------- > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API > Affects Versions: 1.2.0, 1.1.3 > Reporter: Chen Qin > Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)