[ 
https://issues.apache.org/jira/browse/FLINK-37633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hugo Ricateau updated FLINK-37633:
----------------------------------
    Affects Version/s: 2.2.1
                       1.20.4
                           (was: 2.0.0)
                           (was: 1.19.2)
                           (was: 1.20.1)

> Improved state management for temporal join operator
> ----------------------------------------------------
>
>                 Key: FLINK-37633
>                 URL: https://issues.apache.org/jira/browse/FLINK-37633
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Configuration, Runtime / Queryable State, 
> Stateful Functions, Table SQL / Runtime
>    Affects Versions: 1.20.4, 2.2.1
>            Reporter: Hugo Ricateau
>            Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When performing a temporal join between a left stream and a right versioned 
> table, as the watermark of the join operator progresses, the deprecated 
> entries of the right table (the updated rows whose end of validity precedes 
> the operator watermark) are purged from the state; this is documented 
> [here|https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/joins/#event-time-temporal-join],
>  in the last paragraph (emphasis mine):
> {quote}In contrast to regular joins, the previous temporal table results will 
> not be affected despite the changes on the build side. Compared to interval 
> joins, temporal table joins do not define a time window within which the 
> records will be joined. Records from the probe side are always joined with 
> the build side’s version at the time specified by the time attribute. Thus, 
> rows on the build side might be arbitrarily old. As time passes, {*}no longer 
> needed versions of the record (for the given primary key) will be removed 
> from the state{*}.
> {quote}
> However, while this feature is important to keep the state size under 
> control, it imposes constraints that are not suitable for all use-cases.
> In my use-case, on the one hand, I have a left stream that is subject to a 
> strong disorder (events actually arrive in order, but their business validity 
> time might be far in the past – definitely being older than the event 
> timestamp); on the other hand, the upsert right stream produces a versioned 
> table, where each version of a row is valid from the update event time (up to 
> the next update).
> As order does not matter on the left stream, and as we would like to release 
> its events as soon as possible, we declared the watermark as {{{}WATERMARK 
> FOR <business-validity-time> AS <event-time>{}}}. But since the state of the 
> join operator does not retain "deprecated" versions of the rows in the right 
> table, some events on the left stream are conversely unable to find the 
> appropriate right-side row to join with, due to their “old” business validity 
> time.
> No workaround is possible as either the state will be purged of expired 
> versions or pending events won't be released, since both mechanisms are 
> controlled by the same single parameter: {{{}currentWatermark{}}}; hence, I 
> would suggest {*}adding a configuration value that allows delaying removal of 
> deprecated versions of a record from the state{*}.
> To the extent that you agree with my improvement proposal, I am volunteer to 
> implement it; here is a draft of how I would do it:
>  * Add a {{long outdatedRetentionTime}} parameter to the 
> {{TemporalRowTimeJoinOperator}} class 
> [constructor|https://github.com/apache/flink/blob/ec810badf19dc6eeef46230dfcb690c88bf211cd/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L126-L143];
>  * Use this interval to delay the passed {{currentWatermark}} in [this 
> call|https://github.com/apache/flink/blob/ec810badf19dc6eeef46230dfcb690c88bf211cd/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L273]
>  to {{cleanupExpiredVersionInState}} (a negative delay leading to completely 
> disabling the feature);
>  * Add a {{table.exec.state.outdated-versions.retention}} configuration value 
> [here|https://github.com/apache/flink/blob/ec810badf19dc6eeef46230dfcb690c88bf211cd/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java#L60];
>  * And pass it to the {{TemporalRowTimeJoinOperator}} 
> [constructor|https://github.com/apache/flink/blob/5aaff07fe925b289a2cabe9269a015ebc2255223/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java#L268-L276],
>  following the same pattern as for {{{}minRetentionTime{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to