[
https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16803810#comment-16803810
]
Jingsong Lee commented on FLINK-11974:
--------------------------------------
I think it difficult to separate ClassLoader because now each Task has a
separate ClassLoader. That will bring about considerable changes.
> What do you mean?
I think there are only two ways to do this: provide StreamOperatorSubstitutor
or provide OperatorWrapper to invoke every method of CodeGenOperator. Any
equally easier way?
> Introduce StreamOperatorSubstitutor to help table perform the whole Operator
> CodeGen
> ------------------------------------------------------------------------------------
>
> Key: FLINK-11974
> URL: https://issues.apache.org/jira/browse/FLINK-11974
> Project: Flink
> Issue Type: New Feature
> Components: Runtime / Operators
> Reporter: Jingsong Lee
> Assignee: Jingsong Lee
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> If we need CodeGen an entire Operator, one possible solution is to introduce
> an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's
> open, and then proxy all methods to the sub-Operator.
> Doing so results in multiple virtual function calls, so we introduce a
> StreamOperatorSubstitutor:
> {code:java}
> /**
> * Basic interface for stream operator substitutes. It is transferred to the
> streamTask by
> * serialization, and produce an actual stream operator to the streamTask,
> who uses the actual
> * stream operator to run.
> *
> * @param <OUT> output type of the actual stream operator
> */
> public interface StreamOperatorSubstitutor<OUT> {
> /**
> * Produces the actual stream operator.
> *
> * @param userCodeClassLoader the user code class loader to use.
> * @return the actual stream operator created on {@code StreamTask}.
> */
> StreamOperator<OUT> getActualStreamOperator(ClassLoader
> userCodeClassLoader);
> }
> {code}
> In StreamConfig.getStreamOperator, we need:
> {code:java}
> if (operator != null && operator instanceof StreamOperatorSubstitutor) {
> return (T) ((StreamOperatorSubstitutor)
> operator).getActualStreamOperator(cl);
> } else {
> return (T) operator;
> }
> {code}
> to get the real operator.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)