[
https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815208#comment-16815208
]
Jingsong Lee commented on FLINK-11974:
--------------------------------------
Hi [~pnowojski] If we want to code generate a OneInputStreamOperator, we will
get a String of the java class. But how we take it to runtime to execution?
First way is introduce a OneInputOperatorWrapper to proxy all
OneInputStreamOperator method calls. In setup, it will cook generated class and
newInstance. Just like
[https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/OneInputOperatorWrapper.java]
do.
Second way is introduce a StreamOperatorSubstitutor to implement
getActualStreamOperator, it will cook generated class and newInstance in
getActualStreamOperator, in StreamConfig.getStreamOperator will invoke it, In
this way, the execution of Task can get the real OneInputStreamOperator.
Compared with the first method, it can make less method calls. Just like
[https://github.com/JingsongLi/flink/blob/benchmarkop/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/OneInputOperatorWrapper.java]
do.
But as you said, Whether this virtual function call is lossy or not requires
benchmark. I will do some benchmarks.
> Introduce StreamOperatorSubstitutor to help table perform the whole Operator
> CodeGen
> ------------------------------------------------------------------------------------
>
> Key: FLINK-11974
> URL: https://issues.apache.org/jira/browse/FLINK-11974
> Project: Flink
> Issue Type: New Feature
> Components: Runtime / Operators
> Reporter: Jingsong Lee
> Assignee: Jingsong Lee
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> If we need CodeGen an entire Operator, one possible solution is to introduce
> an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's
> open, and then proxy all methods to the sub-Operator.
> Doing so results in multiple virtual function calls, so we introduce a
> StreamOperatorSubstitutor:
> {code:java}
> /**
> * Basic interface for stream operator substitutes. It is transferred to the
> streamTask by
> * serialization, and produce an actual stream operator to the streamTask,
> who uses the actual
> * stream operator to run.
> *
> * @param <OUT> output type of the actual stream operator
> */
> public interface StreamOperatorSubstitutor<OUT> {
> /**
> * Produces the actual stream operator.
> *
> * @param userCodeClassLoader the user code class loader to use.
> * @return the actual stream operator created on {@code StreamTask}.
> */
> StreamOperator<OUT> getActualStreamOperator(ClassLoader
> userCodeClassLoader);
> }
> {code}
> In StreamConfig.getStreamOperator, we need:
> {code:java}
> if (operator != null && operator instanceof StreamOperatorSubstitutor) {
> return (T) ((StreamOperatorSubstitutor)
> operator).getActualStreamOperator(cl);
> } else {
> return (T) operator;
> }
> {code}
> to get the real operator.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)