[ 
https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815885#comment-16815885
 ] 

Kurt Young commented on FLINK-11974:
------------------------------------

[~pnowojski] Regarding to  #1, my gut feeling is it might be okay if we only 
have 1 or 2 OneInputOperatorWrapper chained in a single StreamTask. If we have 
more chaining operators, especially after we make two input operator chaining 
possible, even a lightweight wrapper will certainly reduce the possibility to 
be inlined by JIT.

I agree with that StreamOperatorSubstitutor is something like OperatorFactory, 
maybe we can see it as a simplest OperatorFactory version. If we want to 
introduce this concept, how many changes we want to introduce at first step? 
Are we going to change the existing codes, .e.g. how DataStream API transfer 
stream operators to runtime? If not, we must find a solution to let 
OperatorFactory co-exist with current path, this will involve more thinking and 
design.  

> Introduce StreamOperatorSubstitutor to help table perform the whole Operator 
> CodeGen
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-11974
>                 URL: https://issues.apache.org/jira/browse/FLINK-11974
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Operators
>            Reporter: Jingsong Lee
>            Assignee: Jingsong Lee
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> If we need CodeGen an entire Operator, one possible solution is to introduce 
> an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's 
> open, and then proxy all methods to the sub-Operator.
> Doing so results in multiple virtual function calls, so we introduce a 
> StreamOperatorSubstitutor:
> {code:java}
> /**
>  * Basic interface for stream operator substitutes. It is transferred to the 
> streamTask by
>  * serialization, and produce an actual stream operator to the streamTask, 
> who uses the actual
>  * stream operator to run.
>  *
>  * @param <OUT> output type of the actual stream operator
>  */
> public interface StreamOperatorSubstitutor<OUT> {
>    /**
>     * Produces the actual stream operator.
>     *
>     * @param userCodeClassLoader the user code class loader to use.
>     * @return the actual stream operator created on {@code StreamTask}.
>     */
>    StreamOperator<OUT> getActualStreamOperator(ClassLoader 
> userCodeClassLoader);
> }
> {code}
> In StreamConfig.getStreamOperator, we need:
> {code:java}
> if (operator != null && operator instanceof StreamOperatorSubstitutor) {
>    return (T) ((StreamOperatorSubstitutor) 
> operator).getActualStreamOperator(cl);
> } else {
>    return (T) operator;
> }
> {code}
> to get the real operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to