[
https://issues.apache.org/jira/browse/FLINK-12215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu updated FLINK-12215:
----------------------------
Fix Version/s: 1.10.0
> Introduce SqlProcessFunction for blink streaming runtime
> --------------------------------------------------------
>
> Key: FLINK-12215
> URL: https://issues.apache.org/jira/browse/FLINK-12215
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / Runtime
> Reporter: Jark Wu
> Assignee: Jark Wu
> Priority: Major
> Fix For: 1.10.0
>
>
> Currently, we are heavily using DataStream's {{KeyedProcessFunction}} when
> implementing Blink SQL runtime. But there are some disadvantages that lead us
> to introduce a SQL own ProcessFunction.
> 1. {{KeyedProcessFunction}} do not support {{endInput(Collector)}}.
> This is needed to achieve a same semantic for batch and stream. For example:
> {{SELECT COUNT(\*) FROM T}} should return {{0}} when input is empty, but now
> there is no output result. That's why we need the {{endInput(Collector)}} to
> emit a final result. I know this is not a real world streaming use case, but
> is worth to do.
> 2. {{KeyedProcessFunction}} is an abstract class.
> As discussed in FLINK-11409, if it is an interface it will be easy to extract
> some common logic to a base class and share it between ProcessFunction and
> CoProcessFunction and other functions. But it doesn't work when it is an
> abstract class. We also encountered this problem when we want to reuse some
> code. However, it's hard to make {{KeyedProcessFunction}} as an interface
> because of compatibility.
> 3. {{KeyedProcessFunction}} doesn't expose {{setCurrentKey}}.
> We have some optimization about lazy state writing, i.e. buffer the changes
> in heap and flush them to state when doing snapshot. That needs to change
> current key of the operator/function.
> That's why we want to introduce a SQL own {{ProcessFunction}} interface.
> Maybe we can call it {{SqlProcessFunction}}. The name can be discussed in the
> JIRA.
> The initial idea of {{SqlPrcessFunction}}:
> {code:java}
> public interface SqlProcessFunction<K, I, O> extends Function {
> void processElement(I value, Context<K> ctx, Collector<O> out) throws
> Exception;
> void onTimer(long timestamp, OnTimerContext<K> ctx, Collector<O> out)
> throws Exception;
>
> void endInput(Context<K> ctx, Collector<O> out) throws Exception;
> interface Context<K> {
> TimerService timerService();
> K getCurrentKey();
>
> void setCurrentKey(K key);
> }
> interface OnTimerContext<K> extends Context<K> {
> TimeDomain timeDomain();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)