[ 
https://issues.apache.org/jira/browse/FLINK-12215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-12215:
-------------------------------

    Assignee:     (was: Jark Wu)

> Introduce SqlProcessFunction for blink streaming runtime
> --------------------------------------------------------
>
>                 Key: FLINK-12215
>                 URL: https://issues.apache.org/jira/browse/FLINK-12215
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / Runtime
>            Reporter: Jark Wu
>            Priority: Minor
>
> Currently, we are heavily using DataStream's {{KeyedProcessFunction}} when 
> implementing Blink SQL runtime. But there are some disadvantages that lead us 
> to introduce a SQL own ProcessFunction.
> 1. {{KeyedProcessFunction}} do not support {{endInput(Collector)}}.
> This is needed to achieve a same semantic for batch and stream. For example: 
> {{SELECT COUNT(\*) FROM T}} should return {{0}} when input is empty, but now 
> there is no output result. That's why we need the {{endInput(Collector)}} to 
> emit a final result. I know this is not a real world streaming use case, but 
> is worth to do.
> 2. {{KeyedProcessFunction}} is an abstract class. 
> As discussed in FLINK-11409, if it is an interface it will be easy to extract 
> some common logic to a base class and share it between ProcessFunction and 
> CoProcessFunction and other functions. But it doesn't work when it is an 
> abstract class. We also encountered this problem when we want to reuse some 
> code. However, it's hard to make {{KeyedProcessFunction}} as an interface 
> because of compatibility. 
> 3. {{KeyedProcessFunction}} doesn't expose {{setCurrentKey}}.
> We have some optimization about lazy state writing, i.e. buffer the changes 
> in heap and flush them to state when doing snapshot. That needs to change 
> current key of the operator/function.
> That's why we want to introduce a SQL own {{ProcessFunction}} interface. 
> Maybe we can call it {{SqlProcessFunction}}. The name can be discussed in the 
> JIRA.
> The initial idea of {{SqlPrcessFunction}}: 
> {code:java}
> public interface SqlProcessFunction<K, I, O> extends Function {
>      void processElement(I value, Context<K> ctx, Collector<O> out) throws 
> Exception;
>       void onTimer(long timestamp, OnTimerContext<K> ctx, Collector<O> out) 
> throws Exception;
>     
>       void endInput(Context<K> ctx, Collector<O> out) throws Exception;
>       interface Context<K> {
>               TimerService timerService();
>                 K getCurrentKey();
>         
>               void setCurrentKey(K key);
>       }
>       interface OnTimerContext<K> extends Context<K> {
>               TimeDomain timeDomain();
>       }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to