[
https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16817622#comment-16817622
]
Piotr Nowojski commented on FLINK-11974:
----------------------------------------
Let's wait for some benchmark results.
> how many changes we want to introduce at first step?
I don't know, but I think we would have to consider this before committing
{{StreamOperatorSubstitutor}} in order to avoid going in completely different
direction compared to a perfect world. For example I would hope that
{{OperatorFactory#create}} could help us get rid of {{StreamOperator#setup}}
method and this in turn could allow us to make some fields in
{{StreamOperator}} final and have a better lifecycle contract.
> Introduce StreamOperatorSubstitutor to help table perform the whole Operator
> CodeGen
> ------------------------------------------------------------------------------------
>
> Key: FLINK-11974
> URL: https://issues.apache.org/jira/browse/FLINK-11974
> Project: Flink
> Issue Type: New Feature
> Components: Runtime / Operators
> Reporter: Jingsong Lee
> Assignee: Jingsong Lee
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> If we need CodeGen an entire Operator, one possible solution is to introduce
> an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's
> open, and then proxy all methods to the sub-Operator.
> Doing so results in multiple virtual function calls, so we introduce a
> StreamOperatorSubstitutor:
> {code:java}
> /**
> * Basic interface for stream operator substitutes. It is transferred to the
> streamTask by
> * serialization, and produce an actual stream operator to the streamTask,
> who uses the actual
> * stream operator to run.
> *
> * @param <OUT> output type of the actual stream operator
> */
> public interface StreamOperatorSubstitutor<OUT> {
> /**
> * Produces the actual stream operator.
> *
> * @param userCodeClassLoader the user code class loader to use.
> * @return the actual stream operator created on {@code StreamTask}.
> */
> StreamOperator<OUT> getActualStreamOperator(ClassLoader
> userCodeClassLoader);
> }
> {code}
> In StreamConfig.getStreamOperator, we need:
> {code:java}
> if (operator != null && operator instanceof StreamOperatorSubstitutor) {
> return (T) ((StreamOperatorSubstitutor)
> operator).getActualStreamOperator(cl);
> } else {
> return (T) operator;
> }
> {code}
> to get the real operator.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)