[
https://issues.apache.org/jira/browse/FLINK-19449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17297055#comment-17297055
]
laiyijie commented on FLINK-19449:
----------------------------------
LI, Implementation without the index may not be so good, but is a temporary
solution. The `retract` and `merge` function is not so importand for LAG. I
suggest to implement a LAG function with index. In the streaming mode , it
could cover more situation than that without index .
I have tried to implement a UDF function for lag without `retract` and `merge`
, but the UDF function has a restriction which cannot support different type of
data in single one function name `LAG`.
I test the UDF in the streaming mode, and the result is correct. And I think
the `accumulate` function is called by order in streaming mode in over window
with orderby.
finnaly what I wanna to resolve was the problem of cannot using lag in
streaming mode. Of course with index.
THX for your reading, I really wanna to fix this problem and make flink better
in basic functions.
Here is my UDF of `LAG` with index.
public class LagAggFunction<T> extends AggregateFunction<T,
LagAggFunction.Acc<T>> {
protected Integer offset = 1;
public static class Acc<T> {
LinkedList<T> window = new LinkedList<>();
}
@Override
public Acc<T> createAccumulator() {
return new Acc<T>();
}
@Override
public T getValue(Acc<T> acc) {
if (acc.window.size() - 1 < this.offset){
return null;
}
return acc.window.getFirst();
}
public void accumulate(Acc<T> acc, T iValue, Integer offset) {
this.offset = offset;
if (this.offset == null) {
this.offset = 1;
}
acc.window.add(iValue);
if (acc.window.size() - 1 > this.offset) {
acc.window.removeFirst();
}
}
public void resetAccumulator(Acc<T> acc) {
acc.window = new LinkedList<T>();
}
public static class FloatLag extends LagAggFunction<Float> {}
}
> LEAD/LAG cannot work correctly in streaming mode
> ------------------------------------------------
>
> Key: FLINK-19449
> URL: https://issues.apache.org/jira/browse/FLINK-19449
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API, Table SQL / Runtime
> Affects Versions: 1.10.2, 1.11.2
> Reporter: Benchao Li
> Priority: Major
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)