[
https://issues.apache.org/jira/browse/FLINK-1869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14501320#comment-14501320
]
Gyula Fora commented on FLINK-1869:
-----------------------------------
Hey,
The purpose of operator chaining is to make it possible for the optimizer layer
to collocate the discretizer operator with the preceeding operator. For
instance if you have a source with parallelism 2 and a discretizer with the
same parallelism (and no data shuffling between) the source would directly
pass the output to the discretizer. This allows greately improved throughput in
these cases.
You are right, the only thing we need to do here is make the Discretizer
classes extend the ChainableStreamOperator which implements the Collector
interface. This allows the discretizer to receive inputs through the collect
method. You should override the default implementation of the collect to make
it work properly (or refactor the disrectizer methods to make it work with the
default collect implementation, that would be nicer).
The WindowIntegrationTest actually fails if you just change the discretizer to
extend the chainable. But you should also confirm the behaviour by modifying
some of the discretizer tests to receive the inputs by collect.
Gyula
> Make StreamDiscretizers chainable
> ---------------------------------
>
> Key: FLINK-1869
> URL: https://issues.apache.org/jira/browse/FLINK-1869
> Project: Flink
> Issue Type: Improvement
> Components: Streaming
> Reporter: Gyula Fora
>
> Currently the different StreamDiscretizer operators (StreamDiscretizer,
> GroupedStreamDiscretizer, GroupedActiveDiscretizer) are non-chainable.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)