[ 
https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815546#comment-16815546
 ] 

Piotr Nowojski commented on FLINK-11974:
----------------------------------------

Thanks for the more detailed explanation [~lzljs3620320], I'm starting to 
understand the problem. Couple of follow up questions/comments:
1. I would expect current {{OneInputOperatorWrapper}} to be inlined by JIT, 
effectively doing automatically what you are trying to achieve by 
{{StreamOperatorSubstitutor}}. However I might be wrong here.
2. Separate class loaders might also achieve this and I described before even 
more. But for this, we would indeed need more changes, maybe introducing some 
kind of OperatorFactory.
3. Isn't {{StreamOperatorSubstitutor}} effectively a little bit weird 
{{OperatorFactory}} concept? If we discard points 1. and 2., maybe we could 
consider thinking more towards this direction? Instead of serialising 
{{StreamOperator}} instances, we could add a possibility to serialise a 
{{StreamOperatorFactory}} instance, that would be used to create an operator? 
It might be effectively the same thing you are proposing, but could prove to be 
more general.

> But as you said, Whether this virtual function call is lossy or not requires 
> benchmark. I will do some benchmarks.

Yes, let's start with that. We need more benchmark coverage for possible future 
regressions anyway and the best possible outcome of this ticket would be for me 
that we actually do not need to introduce any new concepts/API/internal API 
changes.

> Introduce StreamOperatorSubstitutor to help table perform the whole Operator 
> CodeGen
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-11974
>                 URL: https://issues.apache.org/jira/browse/FLINK-11974
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Operators
>            Reporter: Jingsong Lee
>            Assignee: Jingsong Lee
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> If we need CodeGen an entire Operator, one possible solution is to introduce 
> an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's 
> open, and then proxy all methods to the sub-Operator.
> Doing so results in multiple virtual function calls, so we introduce a 
> StreamOperatorSubstitutor:
> {code:java}
> /**
>  * Basic interface for stream operator substitutes. It is transferred to the 
> streamTask by
>  * serialization, and produce an actual stream operator to the streamTask, 
> who uses the actual
>  * stream operator to run.
>  *
>  * @param <OUT> output type of the actual stream operator
>  */
> public interface StreamOperatorSubstitutor<OUT> {
>    /**
>     * Produces the actual stream operator.
>     *
>     * @param userCodeClassLoader the user code class loader to use.
>     * @return the actual stream operator created on {@code StreamTask}.
>     */
>    StreamOperator<OUT> getActualStreamOperator(ClassLoader 
> userCodeClassLoader);
> }
> {code}
> In StreamConfig.getStreamOperator, we need:
> {code:java}
> if (operator != null && operator instanceof StreamOperatorSubstitutor) {
>    return (T) ((StreamOperatorSubstitutor) 
> operator).getActualStreamOperator(cl);
> } else {
>    return (T) operator;
> }
> {code}
> to get the real operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to