[
https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16800353#comment-16800353
]
Jingsong Lee commented on FLINK-11974:
--------------------------------------
Hi [~pnowojski]
I think the introduction of Stream Operator Substitutor is a better solution:
1.As you say, there is only one or two implementations of given class present,
JIT can devirtualize those calls. But in the actual production environment,
most cases have OperatorChain, and there are more than two StreamOperator in
the same ClassLoader, so in most cases, JVM cannot devirtualize those calls.
2.Compared with OperatorWrapper, we don't have to proxy all the methods once,
which is simpler.
> Introduce StreamOperatorSubstitutor to help table perform the whole Operator
> CodeGen
> ------------------------------------------------------------------------------------
>
> Key: FLINK-11974
> URL: https://issues.apache.org/jira/browse/FLINK-11974
> Project: Flink
> Issue Type: New Feature
> Components: Runtime / Operators
> Reporter: Jingsong Lee
> Assignee: Jingsong Lee
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> If we need CodeGen an entire Operator, one possible solution is to introduce
> an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's
> open, and then proxy all methods to the sub-Operator.
> Doing so results in multiple virtual function calls, so we introduce a
> StreamOperatorSubstitutor:
> {code:java}
> /**
> * Basic interface for stream operator substitutes. It is transferred to the
> streamTask by
> * serialization, and produce an actual stream operator to the streamTask,
> who uses the actual
> * stream operator to run.
> *
> * @param <OUT> output type of the actual stream operator
> */
> public interface StreamOperatorSubstitutor<OUT> {
> /**
> * Produces the actual stream operator.
> *
> * @param userCodeClassLoader the user code class loader to use.
> * @return the actual stream operator created on {@code StreamTask}.
> */
> StreamOperator<OUT> getActualStreamOperator(ClassLoader
> userCodeClassLoader);
> }
> {code}
> In StreamConfig.getStreamOperator, we need:
> {code:java}
> if (operator != null && operator instanceof StreamOperatorSubstitutor) {
> return (T) ((StreamOperatorSubstitutor)
> operator).getActualStreamOperator(cl);
> } else {
> return (T) operator;
> }
> {code}
> to get the real operator.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)