[ 
https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798987#comment-16798987
 ] 

Piotr Nowojski commented on FLINK-11974:
----------------------------------------

[~lzljs3620320] could elaborate a little bit more what's the problem and how 
are you trying to fix it?

As I understand it, the problem are the virtual calls for the operator 
instances? If that's the case I think I don't fully understand your proposed 
solution. 

I have never researched this, but as far as I recall we could just load each 
operator in separate class loader and let JIT take care of devirtualizing all 
of the classes, not only the operator. The idea here is that as long as there 
is only one or two implementations of given class present, JIT can devirtualize 
those calls. Now loading each instance in separate class loaders we might be 
able to leverage this optimisation. 

> Introduce StreamOperatorSubstitutor to help table perform the whole Operator 
> CodeGen
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-11974
>                 URL: https://issues.apache.org/jira/browse/FLINK-11974
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Operators
>            Reporter: Jingsong Lee
>            Assignee: Jingsong Lee
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> If we need CodeGen an entire Operator, one possible solution is to introduce 
> an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's 
> open, and then proxy all methods to the sub-Operator.
> Doing so results in multiple virtual function calls, so we introduce a 
> StreamOperatorSubstitutor:
> {code:java}
> /**
>  * Basic interface for stream operator substitutes. It is transferred to the 
> streamTask by
>  * serialization, and produce an actual stream operator to the streamTask, 
> who uses the actual
>  * stream operator to run.
>  *
>  * @param <OUT> output type of the actual stream operator
>  */
> public interface StreamOperatorSubstitutor<OUT> {
>    /**
>     * Produces the actual stream operator.
>     *
>     * @param userCodeClassLoader the user code class loader to use.
>     * @return the actual stream operator created on {@code StreamTask}.
>     */
>    StreamOperator<OUT> getActualStreamOperator(ClassLoader 
> userCodeClassLoader);
> }
> {code}
> In StreamConfig.getStreamOperator, we need:
> {code:java}
> if (operator != null && operator instanceof StreamOperatorSubstitutor) {
>    return (T) ((StreamOperatorSubstitutor) 
> operator).getActualStreamOperator(cl);
> } else {
>    return (T) operator;
> }
> {code}
> to get the real operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to