[ 
https://issues.apache.org/jira/browse/FLINK-19449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17297055#comment-17297055
 ] 

laiyijie commented on FLINK-19449:
----------------------------------

LI, Implementation  without the index may not be so good, but is a temporary 
solution. The `retract` and `merge` function is not so importand for LAG. I 
suggest to implement a LAG function with index. In the streaming mode , it 
could cover more situation than that without index .

I have tried to implement a UDF function for lag without `retract` and `merge` 
, but the UDF function has a restriction which cannot support different type of 
data in single one function name `LAG`. 

I test  the UDF in the streaming mode, and the result is correct.  And I think 
the `accumulate` function is called by order in streaming mode in over window 
with orderby.

finnaly what I wanna to resolve was the problem of cannot using lag in 
streaming mode.  Of course with index.

THX for your reading, I really wanna to fix this problem  and make flink better 
in basic functions.

 

Here is my UDF of `LAG` with index.

 

public class LagAggFunction<T> extends AggregateFunction<T, 
LagAggFunction.Acc<T>> {
 protected Integer offset = 1;

 public static class Acc<T> {
 LinkedList<T> window = new LinkedList<>();
 }

 @Override
 public Acc<T> createAccumulator() {
 return new Acc<T>();
 }

 @Override
 public T getValue(Acc<T> acc) {
 if (acc.window.size() - 1 < this.offset){
 return null;
 }
 return acc.window.getFirst();
 }

 public void accumulate(Acc<T> acc, T iValue, Integer offset) {
 this.offset = offset;
 if (this.offset == null) {
 this.offset = 1;
 }
 acc.window.add(iValue);
 if (acc.window.size() - 1 > this.offset) {
 acc.window.removeFirst();
 }
 }

 public void resetAccumulator(Acc<T> acc) {
 acc.window = new LinkedList<T>();
 }

 public static class FloatLag extends LagAggFunction<Float> {}
}

 

 

 

> LEAD/LAG cannot work correctly in streaming mode
> ------------------------------------------------
>
>                 Key: FLINK-19449
>                 URL: https://issues.apache.org/jira/browse/FLINK-19449
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Runtime
>    Affects Versions: 1.10.2, 1.11.2
>            Reporter: Benchao Li
>            Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to