[ 
https://issues.apache.org/jira/browse/FLINK-20128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-20128:
-----------------------------------
      Labels: auto-deprioritized-major auto-deprioritized-minor  (was: 
auto-deprioritized-major stale-minor)
    Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Data loss for over windows with rows unbounded preceding
> --------------------------------------------------------
>
>                 Key: FLINK-20128
>                 URL: https://issues.apache.org/jira/browse/FLINK-20128
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.11.2, 1.12.0
>            Reporter: Thilo Schneider
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> When using partitioned, unbounded over windows, all but one partitions are 
> dropped in the output dataset:
> {code:python}
> # Setup
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> from biafflink import debug_print_table
> env_settings = 
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> t_env = StreamTableEnvironment.create( environment_settings=env_settings)
> t_env.get_config().get_configuration().set_integer("table.exec.resource.default-parallelism",
>  1)
> t_env.execute_sql("""
> CREATE TABLE datagen (
>  foo INT,
>  id AS mod(foo, 2),
>  message_time AS to_timestamp(from_unixtime(FLOOR(foo/2))),
>  WATERMARK FOR message_time AS message_time
> ) WITH (
>  'connector' = 'datagen',
>  'rows-per-second'='2',
>  'fields.foo.kind'='sequence',
>  'fields.foo.start'='0',
>  'fields.foo.end'='19'
> )""")
> t_env.execute_sql("CREATE TABLE output (foo INT, id INT, lagfoo INT) WITH 
> ('connector' = 'print')")
> {code}
> Using bounded over windows, everything works as expected:
> {code:python}
> t = t_env.sql_query("""
>     SELECT foo, id, avg(foo) OVER w AS lagfoo 
>     FROM datagen 
>     WINDOW w AS (PARTITION BY id ORDER BY message_time ROWS BETWEEN 1 
> PRECEDING AND CURRENT ROW)""")
> t.execute_insert("output")
> {code}
> yields
> {code:python}
> +I(0,0,0)
> +I(1,1,1)
> +I(2,0,1)
> +I(3,1,2)
> +I(4,0,3)
> ...
>  {code}
> If we change the window to unbounded preceding:
> {code:python}
> t = t_env.sql_query("""
>     SELECT foo, id, avg(foo) OVER w AS lagfoo 
>     FROM datagen 
>     WINDOW w AS (PARTITION BY id ORDER BY message_time ROWS BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW)""")
> t.execute_insert("output")
> {code}
>  we loose all of id == 1:
> {code:python}
> +I(0,0,0)
> +I(2,0,1)
> +I(4,0,2)
> +I(6,0,3)
> +I(8,0,4)
> ...
> {code}
> I observed this problem with various aggregate functions and both under 
> 1.11.2 and 1.12rc1. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to