[ 
https://issues.apache.org/jira/browse/FLINK-18119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyeonseop Lee updated FLINK-18119:
----------------------------------
    Description: 
For time range bounded over aggregation in streaming query, like below,
{code:java}
table
  .window(Over.partitionBy 'a orderBy 'rowtime preceding 1.hour as 'w)
  .groupBy('w)
  .select('a, aggregateFunction('b))
{code}
the operator must hold incoming records over the preceding time range in the 
state, but older records are no longer required and can be cleaned up.

Current implementation retracts the old records only when newer records come in 
and so the operator knows that enough time has passed. However, the retraction 
never happens unless a new record with the same key comes in and this causes a 
state that perhaps will never be released, which leads to an unlimitedly 
growing state especially when the keyspace mutates over time.

Since aggregate over bounded preceding time interval doesn't require old 
records by its nature, we can improve this by adding a timer that notifies the 
operator to retract old records, resulting in no changes in query result or 
severe performance degrade.

This is a distinct feature from state retention: state retention is to forget 
some states that are expected to be less important to reduce state memory, so 
it possibly changes query results. Enabling and disabling state retention both 
make sense with this change.

This issue applies to both row time range bound and proc time range bound. That 
is, we are going to have changes in both RowTimeRangeBoundedPrecedingFunction 
and ProcTimeRangeBoundedPrecedingFunction in flink-table-runtime-blink. I 
already have a running-in-production version with this change and would be glad 
to contribute.

  was:
For time range bounded over aggregation in streaming query, like below,
{code:java}
table
  .window(Over.partitionBy 'a orderBy 'rowtime preceding 1.hour as 'w)
  .groupBy('w)
  .select('a, aggregateFunction('b))
{code}
the operator must hold incoming records over the preceding time range in the 
state, but older records are no longer required and can be cleaned up.

Current implementation cleans the old records up only when newer records come 
in and so the operator knows that enough time has passed. However, the clean up 
never happens unless a new record with the same key comes in and this causes a 
state that perhaps will never be cleaned up, which leads to an unlimitedly 
growing state especially when the keyspace mutates over time.

Since aggregate over bounded preceding time interval doesn't require old 
records by its nature, we can improve this by adding a timer that notifies the 
operator to clean up old records, resulting in no changes in query result or 
severe performance degrade.

This is a distinct feature from state retention: state retention is to forget 
some states that are expected to be less important to reduce state memory, so 
it possibly changes query results. Enabling and disabling state retention both 
make sense with this change.

This issue applies to both row time range bound and proc time range bound. That 
is, we are going to have changes in both RowTimeRangeBoundedPrecedingFunction 
and ProcTimeRangeBoundedPrecedingFunction in flink-table-runtime-blink. I 
already have a running-in-production version with this change and would be glad 
to contribute.


> Fix unlimitedly growing state for time range bounded over aggregate
> -------------------------------------------------------------------
>
>                 Key: FLINK-18119
>                 URL: https://issues.apache.org/jira/browse/FLINK-18119
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>    Affects Versions: 1.10.1
>            Reporter: Hyeonseop Lee
>            Priority: Major
>
> For time range bounded over aggregation in streaming query, like below,
> {code:java}
> table
>   .window(Over.partitionBy 'a orderBy 'rowtime preceding 1.hour as 'w)
>   .groupBy('w)
>   .select('a, aggregateFunction('b))
> {code}
> the operator must hold incoming records over the preceding time range in the 
> state, but older records are no longer required and can be cleaned up.
> Current implementation retracts the old records only when newer records come 
> in and so the operator knows that enough time has passed. However, the 
> retraction never happens unless a new record with the same key comes in and 
> this causes a state that perhaps will never be released, which leads to an 
> unlimitedly growing state especially when the keyspace mutates over time.
> Since aggregate over bounded preceding time interval doesn't require old 
> records by its nature, we can improve this by adding a timer that notifies 
> the operator to retract old records, resulting in no changes in query result 
> or severe performance degrade.
> This is a distinct feature from state retention: state retention is to forget 
> some states that are expected to be less important to reduce state memory, so 
> it possibly changes query results. Enabling and disabling state retention 
> both make sense with this change.
> This issue applies to both row time range bound and proc time range bound. 
> That is, we are going to have changes in both 
> RowTimeRangeBoundedPrecedingFunction and 
> ProcTimeRangeBoundedPrecedingFunction in flink-table-runtime-blink. I already 
> have a running-in-production version with this change and would be glad to 
> contribute.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to