[
https://issues.apache.org/jira/browse/FLINK-18119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140290#comment-17140290
]
Jark Wu commented on FLINK-18119:
---------------------------------
As discussed in the pull request, we changed the State ttl behavior for
{{ProcTimeRangeBoundedPrecedingFunction}} and
{{RowTimeRowsBoundedPrecedingFunction}}. They expire state automantically when
no more data coming in in the bounded window (event/processing) time range,
instead of expire depends on the {{TableConfig.setIdleStateRetentionTime}}.
> Fix unlimitedly growing state for time range bounded over aggregate
> -------------------------------------------------------------------
>
> Key: FLINK-18119
> URL: https://issues.apache.org/jira/browse/FLINK-18119
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.10.1
> Reporter: Hyeonseop Lee
> Assignee: Hyeonseop Lee
> Priority: Major
> Labels: pull-request-available
>
> For time range bounded over aggregation in streaming query, like below,
> {code:java}
> table
> .window(Over.partitionBy 'a orderBy 'rowtime preceding 1.hour as 'w)
> .groupBy('w)
> .select('a, aggregateFunction('b))
> {code}
> the operator must hold incoming records over the preceding time range in the
> state, but older records are no longer required and can be cleaned up.
> Current implementation retracts the old records only when newer records come
> in and so the operator knows that enough time has passed. However, the
> retraction never happens unless a new record with the same key comes in and
> this causes a state that perhaps will never be released, which leads to an
> unlimitedly growing state especially when the keyspace mutates over time.
> Since aggregate over bounded preceding time interval doesn't require old
> records by its nature, we can improve this by adding a timer that notifies
> the operator to retract old records, resulting in no changes in query result
> or severe performance degrade.
> This is a distinct feature from state retention: state retention is to forget
> some states that are expected to be less important to reduce state memory, so
> it possibly changes query results. Enabling and disabling state retention
> both make sense with this change.
> This issue applies to both row time range bound and proc time range bound.
> That is, we are going to have changes in both
> RowTimeRangeBoundedPrecedingFunction and
> ProcTimeRangeBoundedPrecedingFunction in flink-table-runtime-blink. I already
> have a running-in-production version with this change and would be glad to
> contribute.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)