Hugo Ricateau created FLINK-37633:
-------------------------------------
Summary: Improved state management for temporal join operator
Key: FLINK-37633
URL: https://issues.apache.org/jira/browse/FLINK-37633
Project: Flink
Issue Type: Improvement
Components: Runtime / Configuration, Runtime / Queryable State,
Stateful Functions, Table SQL / Runtime
Affects Versions: 1.20.1, 1.19.2, 2.0.0
Reporter: Hugo Ricateau
When performing a temporal join between a left stream and a right versioned
table, as the watermark of the join operator progresses, the deprecated entries
of the right table (the updated rows whose end of validity precedes the
operator watermark) are purged from the state; this is documented
[here|https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/joins/#event-time-temporal-join],
in the last paragraph (emphasis mine):
{quote}In contrast to regular joins, the previous temporal table results will
not be affected despite the changes on the build side. Compared to interval
joins, temporal table joins do not define a time window within which the
records will be joined. Records from the probe side are always joined with the
build side’s version at the time specified by the time attribute. Thus, rows on
the build side might be arbitrarily old. As time passes, {*}no longer needed
versions of the record (for the given primary key) will be removed from the
state{*}.
{quote}
However, while this feature is important to keep the state size under control,
it imposes constraints that are not suitable for all use cases.
In my use-case, on the one hand, I have a left stream that is subject to a
strong disorder (events actually arrive in order, but their business validity
time might be far in the past -- definitely being older than the event
timestamp); on the other hand, the upsert right stream produces a versioned
table, where each version of a row is valid from the update event time (up to
the next update).
As order does not matter on the left stream, and as we would like to release
its events as soon as possible, we declared the watermark as {{{}WATERMARK FOR
<business-validity-time> AS <event-time>{}}}. But since the state of the join
operator does not retain "deprecated" versions of the rows in the right table,
some events on the left stream are conversely unable to find the appropriate
right-side row to join with, due to their “old” business validity time.
David Anderson suggested the following workaround to deal with such situations:
fixing the watermark of the right stream to a constant far in the future
timestamp ensures that no version of a row of the right table is ever purged
from the state (as the watermark of the join operator will always be way behind
the watermark of the events of the right stream). As a result, the watermark of
the join operator follows the one of the left stream, releasing its events as
they arrive. However there are two severe drawbacks to this solution:
* As the watermark on the left stream has been fixed far in the future, it
breaks the ability to preserve order between the events of the left and right
streams (e.g. they arise from a CDC connector and insert operations on the
right table necessarily precedes those on the left table to fulfil foreign keys
constraints). Another workaround is to introduce an artificial delay on the
left stream, to increase the likelihood that right-side events will be
processed before the corresponding left-side ones; but this offers no
guarantees and introduces undesirable lag in the pipeline.
* Moreover, this all-or-nothing strategy on the retention policy of
"deprecated" versions of the right table, clearly lacks a more fine-grained
control like purging versions that are deprecated since a configurable grace
period.
I would like suggesting {*}adding a configuration value that allows delaying
removal of deprecated versions of a record from the state{*}.
To the extent that you agree with my improvement proposal, I'm volunteer to
implement it; here is a draft of how I would do it:
* Add a {{long outdatedRetentionTime}} parameter to the
{{TemporalRowTimeJoinOperator}} class
[constructor|https://github.com/apache/flink/blob/ec810badf19dc6eeef46230dfcb690c88bf211cd/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L126-L143];
* Use this interval to delay the passed {{currentWatermark}} in [this
call|https://github.com/apache/flink/blob/ec810badf19dc6eeef46230dfcb690c88bf211cd/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java#L273]
to {{cleanupExpiredVersionInState}} (a negative delay leading to completely
disabling the feature);
* Add a {{table.exec.state.outdated-versions.retention}} configuration value
[here|https://github.com/apache/flink/blob/ec810badf19dc6eeef46230dfcb690c88bf211cd/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java#L60];
* And pass it to the {{TemporalRowTimeJoinOperator}}
[constructor|https://github.com/apache/flink/blob/5aaff07fe925b289a2cabe9269a015ebc2255223/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java#L268-L276],
following the same pattern as for {{{}minRetentionTime{}}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)