[
https://issues.apache.org/jira/browse/FLINK-11974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819287#comment-16819287
]
Piotr Nowojski commented on FLINK-11974:
----------------------------------------
I've done a little bit more research and simplified this to super simple micro
benchmark. The answer seems to be that JVM is astonishingly bad at
devirtualizing. To avoid it we would either need to code generate whole
operator chain and collectors or to process records in some micro batches, but
that's completely different story.
I agree that we need some kind of {{OperatorFactory}}, and I talked with
[~StephanEwen], [~srichter] and [~till.rohrmann]. Our consensus was that this
proposed {{StreamOperatorSubstitutor}} is not the idea solution
({{StreamOperatorSubstitutor}} is basically an {{OperatorFactory}} that was
merged with {{Operator}} interface). Apparently my previously mentioned idea of
introducing proper {{OperatorFactory}} was on some TODO list (with future
ideas) after all - we wanted to introduce some intermediate operator
representation anyway. Adding {{StreamOperatorSubstitutor}} would be
unfortunately a wasted effort from this perspective that would complicate our
operator APIs and would add more code to rewrite in the future.
We are proposing to connect those two efforts (performance improvement from
this ticket with {{OperatorFactory}}). There are couple of other things that we
could improve along the way, so I think it would be best to sync up and discuss
all of this. What do you think?
> Are we going to change the existing codes, .e.g. how DataStream API transfer
> stream operators to runtime?
> If not, we must find a solution to let OperatorFactory co-exist with current
> path, this will involve more thinking and design.
I think that we should allow for {{OperatorFactory}} to co-exist with
{{Operator}}, otherwise that would require huge amount of changes. After a
quick research it looks like it could be relatively easily do-able to either
add {{StreamFactoryTransformation}} (parallel and co-existing with current
{{StreamTransformation}}) or just wrap all operators passed to existing
{{StreamTransformation}} into a simple/dummy {{StreamOperatorFactory}}. There
are couple of other things that we could do, like we could drop current
{{StreamOperator#setup(StreamTask<?, ?> containingTask, StreamConfig config,
Output<StreamRecord<OUT>> output)}} method in favour of
{{StreamOperatorFactory#create(StreamTask<?, ?> containingTask, StreamConfig
config, Output<StreamRecord<OUT>> output)}}. This would allow us to make those
parameters {{final}} in the new {{StreamOperator}} implementations.
> Introduce StreamOperatorSubstitutor to help table perform the whole Operator
> CodeGen
> ------------------------------------------------------------------------------------
>
> Key: FLINK-11974
> URL: https://issues.apache.org/jira/browse/FLINK-11974
> Project: Flink
> Issue Type: New Feature
> Components: Runtime / Operators
> Reporter: Jingsong Lee
> Assignee: Jingsong Lee
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> If we need CodeGen an entire Operator, one possible solution is to introduce
> an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's
> open, and then proxy all methods to the sub-Operator.
> Doing so results in multiple virtual function calls, so we introduce a
> StreamOperatorSubstitutor:
> {code:java}
> /**
> * Basic interface for stream operator substitutes. It is transferred to the
> streamTask by
> * serialization, and produce an actual stream operator to the streamTask,
> who uses the actual
> * stream operator to run.
> *
> * @param <OUT> output type of the actual stream operator
> */
> public interface StreamOperatorSubstitutor<OUT> {
> /**
> * Produces the actual stream operator.
> *
> * @param userCodeClassLoader the user code class loader to use.
> * @return the actual stream operator created on {@code StreamTask}.
> */
> StreamOperator<OUT> getActualStreamOperator(ClassLoader
> userCodeClassLoader);
> }
> {code}
> In StreamConfig.getStreamOperator, we need:
> {code:java}
> if (operator != null && operator instanceof StreamOperatorSubstitutor) {
> return (T) ((StreamOperatorSubstitutor)
> operator).getActualStreamOperator(cl);
> } else {
> return (T) operator;
> }
> {code}
> to get the real operator.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)