[ https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798987#comment-16798987 ]
Piotr Nowojski commented on FLINK-11974: ---------------------------------------- [~lzljs3620320] could elaborate a little bit more what's the problem and how are you trying to fix it? As I understand it, the problem are the virtual calls for the operator instances? If that's the case I think I don't fully understand your proposed solution. I have never researched this, but as far as I recall we could just load each operator in separate class loader and let JIT take care of devirtualizing all of the classes, not only the operator. The idea here is that as long as there is only one or two implementations of given class present, JIT can devirtualize those calls. Now loading each instance in separate class loaders we might be able to leverage this optimisation. > Introduce StreamOperatorSubstitutor to help table perform the whole Operator > CodeGen > ------------------------------------------------------------------------------------ > > Key: FLINK-11974 > URL: https://issues.apache.org/jira/browse/FLINK-11974 > Project: Flink > Issue Type: New Feature > Components: Runtime / Operators > Reporter: Jingsong Lee > Assignee: Jingsong Lee > Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > If we need CodeGen an entire Operator, one possible solution is to introduce > an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's > open, and then proxy all methods to the sub-Operator. > Doing so results in multiple virtual function calls, so we introduce a > StreamOperatorSubstitutor: > {code:java} > /** > * Basic interface for stream operator substitutes. It is transferred to the > streamTask by > * serialization, and produce an actual stream operator to the streamTask, > who uses the actual > * stream operator to run. > * > * @param <OUT> output type of the actual stream operator > */ > public interface StreamOperatorSubstitutor<OUT> { > /** > * Produces the actual stream operator. > * > * @param userCodeClassLoader the user code class loader to use. > * @return the actual stream operator created on {@code StreamTask}. > */ > StreamOperator<OUT> getActualStreamOperator(ClassLoader > userCodeClassLoader); > } > {code} > In StreamConfig.getStreamOperator, we need: > {code:java} > if (operator != null && operator instanceof StreamOperatorSubstitutor) { > return (T) ((StreamOperatorSubstitutor) > operator).getActualStreamOperator(cl); > } else { > return (T) operator; > } > {code} > to get the real operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)