[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897060#comment-15897060
 ] 

ASF GitHub Bot commented on FLINK-4460:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3438#discussion_r104383800
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ---
    @@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() {
        }
     
        /**
    +    * Applies the given {@link ProcessFunction} on the input stream, 
thereby
    +    * creating a transformed output stream.
    +    *
    +    * <p>The function will be called for every element in the input 
streams and can produce zero
    +    * or more output elements.
    +    *
    +    * @param processFunction The {@link ProcessFunction} that is called 
for each element
    +    *                      in the stream.
    +    *
    +    * @param <R> The type of elements emitted by the {@code 
ProcessFunction}.
    +    *
    +    * @return The transformed {@link DataStream}.
    +    */
    +   @PublicEvolving
    +   public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> 
processFunction) {
    +
    +           TypeInformation<R> outType = 
TypeExtractor.getUnaryOperatorReturnType(
    +                           processFunction,
    +                           ProcessFunction.class,
    +                           false,
    +                           true,
    +                           getType(),
    +                           Utils.getCallLocationName(),
    +                           true);
    +
    +           return process(processFunction, outType);
    +   }
    +
    +   /**
    +    * Applies the given {@link ProcessFunction} on the input stream, 
thereby
    +    * creating a transformed output stream.
    +    *
    +    * <p>The function will be called for every element in the input 
streams and can produce zero
    +    * or more output elements.
    +    *
    +    * @param processFunction The {@link ProcessFunction} that is called 
for each element
    +    *                      in the stream.
    +    * @param outputType {@link TypeInformation} for the result type of the 
function.
    +    *
    +    * @param <R> The type of elements emitted by the {@code 
ProcessFunction}.
    +    *
    +    * @return The transformed {@link DataStream}.
    +    */
    +   @Internal
    +   public <R> SingleOutputStreamOperator<R> process(
    --- End diff --
    
    Yes, it's exposed for that. The pattern, so far, is for methods to also 
expose a public method that takes a `TypeInformation` because we get the 
`TypeInformation` from the context bound in the Scala API.
    
    Calling `transform()` manually is an option but if we do that we would 
basically not base the Scala API on the Java API anymore and we would have code 
that instantiates the Stream Operators in both the Java and Scala API. For 
example, right now we have the code for instantiating a flat map operator in 
`(Java)DataStream` while `(Scala)DataStream.flatMap()` calls that method.
    
    What do you think?


> Side Outputs in Flink
> ---------------------
>
>                 Key: FLINK-4460
>                 URL: https://issues.apache.org/jira/browse/FLINK-4460
>             Project: Flink
>          Issue Type: New Feature
>          Components: Core, DataStream API
>    Affects Versions: 1.2.0, 1.1.3
>            Reporter: Chen Qin
>            Assignee: Chen Qin
>              Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to