Leonard Xu created FLINK-20405:
----------------------------------
Summary: The LAG function in over window is not implements
correctly
Key: FLINK-20405
URL: https://issues.apache.org/jira/browse/FLINK-20405
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.12.0
Reporter: Leonard Xu
For LAG(input, offset, default) function in over window, it always return
current row's input no matter how the offset is set.
After see the codegen code of the function, I think the implementation is not
correct and need to correct.
{code:java}
// the offset and default value is never used
public UnboundedOverAggregateHelper$24(java.lang.Object[] references) throws
Exception { constant$14 = ((int) 1);
constant$14isNull = false;
constant$15 = ((org.apache.flink.table.data.binary.BinaryStringData) str$13);
constant$15isNull = false; typeSerializer$19 =
(((org.apache.flink.table.runtime.typeutils.StringDataSerializer)
references[0])); }
public void accumulate(org.apache.flink.table.data.RowData accInput) throws
Exception {
org.apache.flink.table.data.binary.BinaryStringData field$21;
boolean isNull$21;
org.apache.flink.table.data.binary.BinaryStringData field$22;
isNull$21 = accInput.isNullAt(2); field$21 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if
(!isNull$21) { field$21 =
((org.apache.flink.table.data.binary.BinaryStringData) accInput.getString(2));
} field$22 = field$21; if (!isNull$21) {
field$22 = (org.apache.flink.table.data.binary.BinaryStringData)
(typeSerializer$19.copy(field$22)); }
if (agg0_leadlag != field$22) { agg0_leadlag =
((org.apache.flink.table.data.binary.BinaryStringData)
typeSerializer$19.copy(field$22)); } ;
agg0_leadlagIsNull = isNull$21; }
{code}
The question comes from user mail list[1]
[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkSQL-kafka-gt-dedup-gt-kafka-td39335.html
--
This message was sent by Atlassian Jira
(v8.3.4#803005)