[ 
https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815208#comment-16815208
 ] 

Jingsong Lee commented on FLINK-11974:
--------------------------------------

Hi [~pnowojski] If we want to code generate a OneInputStreamOperator, we will 
get a String of the java class. But how we take it to runtime to execution?

First way is introduce a OneInputOperatorWrapper to proxy all 
OneInputStreamOperator method calls. In setup, it will cook generated class and 
newInstance. Just like 
[https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/OneInputOperatorWrapper.java]
 do.

Second way is introduce a StreamOperatorSubstitutor to implement 
getActualStreamOperator, it will cook generated class and newInstance in 
getActualStreamOperator, in StreamConfig.getStreamOperator will invoke it,  In 
this way, the execution of Task can get the real OneInputStreamOperator. 
Compared with the first method, it can make less method calls. Just like 
[https://github.com/JingsongLi/flink/blob/benchmarkop/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/OneInputOperatorWrapper.java]
 do.

 

But as you said, Whether this virtual function call is lossy or not requires 
benchmark. I will do some benchmarks.

> Introduce StreamOperatorSubstitutor to help table perform the whole Operator 
> CodeGen
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-11974
>                 URL: https://issues.apache.org/jira/browse/FLINK-11974
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Operators
>            Reporter: Jingsong Lee
>            Assignee: Jingsong Lee
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> If we need CodeGen an entire Operator, one possible solution is to introduce 
> an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's 
> open, and then proxy all methods to the sub-Operator.
> Doing so results in multiple virtual function calls, so we introduce a 
> StreamOperatorSubstitutor:
> {code:java}
> /**
>  * Basic interface for stream operator substitutes. It is transferred to the 
> streamTask by
>  * serialization, and produce an actual stream operator to the streamTask, 
> who uses the actual
>  * stream operator to run.
>  *
>  * @param <OUT> output type of the actual stream operator
>  */
> public interface StreamOperatorSubstitutor<OUT> {
>    /**
>     * Produces the actual stream operator.
>     *
>     * @param userCodeClassLoader the user code class loader to use.
>     * @return the actual stream operator created on {@code StreamTask}.
>     */
>    StreamOperator<OUT> getActualStreamOperator(ClassLoader 
> userCodeClassLoader);
> }
> {code}
> In StreamConfig.getStreamOperator, we need:
> {code:java}
> if (operator != null && operator instanceof StreamOperatorSubstitutor) {
>    return (T) ((StreamOperatorSubstitutor) 
> operator).getActualStreamOperator(cl);
> } else {
>    return (T) operator;
> }
> {code}
> to get the real operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to