[
https://issues.apache.org/jira/browse/FLINK-20405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stephan Ewen updated FLINK-20405:
---------------------------------
Summary: The LAG function in over window is not implemented correctly
(was: The LAG function in over window is not implements correctly)
> The LAG function in over window is not implemented correctly
> ------------------------------------------------------------
>
> Key: FLINK-20405
> URL: https://issues.apache.org/jira/browse/FLINK-20405
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.12.0
> Reporter: Leonard Xu
> Priority: Major
>
> For LAG(input, offset, default) function in over window, it always return
> current row's input no matter how the offset is set.
> After see the codegen code of the function, I think the implementation is not
> correct and need to correct.
> {code:java}
> // the offset and default value is never used
> public UnboundedOverAggregateHelper$24(java.lang.Object[] references) throws
> Exception { constant$14 = ((int) 1);
> constant$14isNull = false;
> constant$15 = ((org.apache.flink.table.data.binary.BinaryStringData)
> str$13); constant$15isNull = false;
> typeSerializer$19 =
> (((org.apache.flink.table.runtime.typeutils.StringDataSerializer)
> references[0])); }
> public void accumulate(org.apache.flink.table.data.RowData accInput) throws
> Exception {
> org.apache.flink.table.data.binary.BinaryStringData field$21;
> boolean isNull$21;
> org.apache.flink.table.data.binary.BinaryStringData field$22;
> isNull$21 = accInput.isNullAt(2); field$21 =
> org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if
> (!isNull$21) { field$21 =
> ((org.apache.flink.table.data.binary.BinaryStringData)
> accInput.getString(2)); } field$22 = field$21;
> if (!isNull$21) { field$22 =
> (org.apache.flink.table.data.binary.BinaryStringData)
> (typeSerializer$19.copy(field$22)); }
> if (agg0_leadlag != field$22) { agg0_leadlag =
> ((org.apache.flink.table.data.binary.BinaryStringData)
> typeSerializer$19.copy(field$22)); } ;
> agg0_leadlagIsNull = isNull$21; }
> {code}
>
> The question comes from user mail list[1]
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkSQL-kafka-gt-dedup-gt-kafka-td39335.html
--
This message was sent by Atlassian Jira
(v8.3.4#803005)