[
https://issues.apache.org/jira/browse/FLINK-20100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
godfrey he closed FLINK-20100.
------------------------------
Resolution: Duplicate
> Lag aggregate function does not return lag, but current row
> -----------------------------------------------------------
>
> Key: FLINK-20100
> URL: https://issues.apache.org/jira/browse/FLINK-20100
> Project: Flink
> Issue Type: Bug
> Components: API / Python, Table SQL / Planner
> Affects Versions: 1.11.2, 1.12.0
> Reporter: Thilo Schneider
> Priority: Minor
> Labels: auto-deprioritized-major
>
> The lag aggregate function seems to always return the current row and not the
> row one lagged behind:
> {code:java}
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> env_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> t_env = StreamTableEnvironment.create( environment_settings=env_settings)
> t_env.execute_sql("""
> CREATE TABLE datagen (
> foo INT,
> message_time AS to_timestamp(from_unixtime(foo)),
> WATERMARK FOR message_time AS message_time
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second'='3',
> 'fields.foo.kind'='sequence',
> 'fields.foo.start'='1',
> 'fields.foo.end'='10'
> )""")
> t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen
> WINDOW w AS (ORDER BY message_time)")
> t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH
> ('connector' = 'print')")
> t.execute_insert("output")
> {code}
> This results in
> {code:java}
> +I(1,1) // Expected (1, null)
> +I(2,2) // Expected (2, 1)
> +I(3,3) // Expected (3, 2)
> +I(4,4) // and so on
> +I(5,5)
> +I(6,6)
> +I(7,7)
> +I(8,8)
> +I(9,9)
> +I(10,10)
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)