[
https://issues.apache.org/jira/browse/FLINK-23170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Roman Khachatryan updated FLINK-23170:
--------------------------------------
Description:
Currently, changelog state backend writes state metadata on first state access.
It is written to the changelog
On materialization, the changelog can be truncated, so the metadata needs to
be written again.
There are several questions to answer:
- *When* to trigger the write (when to call append(metadata))?
At any point (mat. start / mat. end / checkpoint start). It doesn't matter
for correctness - see the next points.
Scheduling append earlier means:
-- including metadata in changelog twice unnecesserily (won't hurt correctness)
-- writing for nothing if materialization fails
Scheduling append later means slowing down the checkpoint
So at materialization end seem to be a better tradeoff.
- *What* metadata to write?
All that was already written ("states.foreach(if md was written write
again);")
- *Where* in changelog to write it to?
No choice but to the end of the changelog. On recovery, it is read first
- *How to wait for write completion* (before completing checkpoint)?
Once appended, the future returned from persist() call should include it
already
So to achieve this it's enough to call appendMetadata() for each changed state
upon materialization start, or finish, or 1st checkpoint after it.
It can be further optimized by storing the SQN at which the metadata was
written and only resetting the flag if materializedSqn >= metadataSqn; but
materialization is relatively rare so it probably doesn't worth it.
—
Another related change is to skip writing metadata on recovery (only if state
was read from the changelog).
This can be achieved by setting the flag when requesting the state from
ChangeLogApplier.
*Please create a separate ticket for that if not implementing in this one.*
—
Note: with TM-side state ownership, actual log truncation may be delayed after
materialization (until all the checkpoints using the log are subsumed). This
should not affect the above logic.
was:
Currently, changelog state backend writes state metadata on first state access.
It is written to the changelog
On materialization, the changelog can be truncated, so the metadata needs to
be written again.
There are several questions to answer:
- *When* to trigger the write (when to call append(metadata))?
At any point (mat. start / mat. end / checkpoint start). It doesn't matter
for correctness - see the next points.
Scheduling append earlier means:
- including metadata in changelog twice unnecesserily (won't hurt correctness)
- writing for nothing if materialization fails
Scheduling append later means:
- slowing down the checkpoint
So at materialization end seem to be a better tradeoff.
- *What* metadata to write?
All that was already written ("states.foreach(if md was written write
again);")
- *Where* in changelog to write it to?
No choice but to the end of the changelog. On recovery, it is read first
- *How to wait for write completion* (before completing checkpoint)?
Once appended, the future returned from persist() call should include it
already
So to achieve this it's enough to call appendMetadata() for each changed state
upon materialization start, or finish, or 1st checkpoint after it.
It can be further optimized by storing the SQN at which the metadata was
written and only resetting the flag if materializedSqn >= metadataSqn; but
materialization is relatively rare so it probably doesn't worth it.
—
Another related change is to skip writing metadata on recovery (only if state
was read from the changelog).
This can be achieved by setting the flag when requesting the state from
ChangeLogApplier.
*Please create a separate ticket for that if not implementing in this one.*
—
Note: with TM-side state ownership, actual log truncation may be delayed after
materialization (until all the checkpoints using the log are subsumed). This
should not affect the above logic.
> Write metadata after materialization
> ------------------------------------
>
> Key: FLINK-23170
> URL: https://issues.apache.org/jira/browse/FLINK-23170
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / State Backends
> Reporter: Roman Khachatryan
> Priority: Major
> Fix For: 1.14.0
>
>
> Currently, changelog state backend writes state metadata on first state
> access. It is written to the changelog
> On materialization, the changelog can be truncated, so the metadata needs to
> be written again.
> There are several questions to answer:
> - *When* to trigger the write (when to call append(metadata))?
> At any point (mat. start / mat. end / checkpoint start). It doesn't
> matter for correctness - see the next points.
> Scheduling append earlier means:
> -- including metadata in changelog twice unnecesserily (won't hurt
> correctness)
> -- writing for nothing if materialization fails
> Scheduling append later means slowing down the checkpoint
> So at materialization end seem to be a better tradeoff.
> - *What* metadata to write?
> All that was already written ("states.foreach(if md was written write
> again);")
> - *Where* in changelog to write it to?
> No choice but to the end of the changelog. On recovery, it is read first
> - *How to wait for write completion* (before completing checkpoint)?
> Once appended, the future returned from persist() call should include it
> already
>
> So to achieve this it's enough to call appendMetadata() for each changed
> state upon materialization start, or finish, or 1st checkpoint after it.
> It can be further optimized by storing the SQN at which the metadata was
> written and only resetting the flag if materializedSqn >= metadataSqn; but
> materialization is relatively rare so it probably doesn't worth it.
> —
> Another related change is to skip writing metadata on recovery (only if
> state was read from the changelog).
> This can be achieved by setting the flag when requesting the state from
> ChangeLogApplier.
> *Please create a separate ticket for that if not implementing in this one.*
> —
> Note: with TM-side state ownership, actual log truncation may be delayed
> after materialization (until all the checkpoints using the log are subsumed).
> This should not affect the above logic.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)