[ https://issues.apache.org/jira/browse/FLINK-19449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17297055#comment-17297055 ]
laiyijie commented on FLINK-19449: ---------------------------------- LI, Implementation without the index may not be so good, but is a temporary solution. The `retract` and `merge` function is not so importand for LAG. I suggest to implement a LAG function with index. In the streaming mode , it could cover more situation than that without index . I have tried to implement a UDF function for lag without `retract` and `merge` , but the UDF function has a restriction which cannot support different type of data in single one function name `LAG`. I test the UDF in the streaming mode, and the result is correct. And I think the `accumulate` function is called by order in streaming mode in over window with orderby. finnaly what I wanna to resolve was the problem of cannot using lag in streaming mode. Of course with index. THX for your reading, I really wanna to fix this problem and make flink better in basic functions. Here is my UDF of `LAG` with index. public class LagAggFunction<T> extends AggregateFunction<T, LagAggFunction.Acc<T>> { protected Integer offset = 1; public static class Acc<T> { LinkedList<T> window = new LinkedList<>(); } @Override public Acc<T> createAccumulator() { return new Acc<T>(); } @Override public T getValue(Acc<T> acc) { if (acc.window.size() - 1 < this.offset){ return null; } return acc.window.getFirst(); } public void accumulate(Acc<T> acc, T iValue, Integer offset) { this.offset = offset; if (this.offset == null) { this.offset = 1; } acc.window.add(iValue); if (acc.window.size() - 1 > this.offset) { acc.window.removeFirst(); } } public void resetAccumulator(Acc<T> acc) { acc.window = new LinkedList<T>(); } public static class FloatLag extends LagAggFunction<Float> {} } > LEAD/LAG cannot work correctly in streaming mode > ------------------------------------------------ > > Key: FLINK-19449 > URL: https://issues.apache.org/jira/browse/FLINK-19449 > Project: Flink > Issue Type: Bug > Components: Table SQL / API, Table SQL / Runtime > Affects Versions: 1.10.2, 1.11.2 > Reporter: Benchao Li > Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)