[ 
https://issues.apache.org/jira/browse/FLINK-25261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17464286#comment-17464286
 ] 

Yuan Mei edited comment on FLINK-25261 at 12/23/21, 4:54 AM:
-------------------------------------------------------------

{quote}1. Correct (as I wrote above in the context of this ticket I mean 
in-memory data only)
2. As I wrote above in the context of this ticket I mean in-memory data only
{quote}
If truncating only means truncating the in-memory part, would this API be 
general enough for other implementations as well? (Like in-memory 
implementation, Kafka based implementation)
{quote}3. In-memory means any in-memory object not needed for checkpointing 
anymore, depending on DSTL implementation (reference to file or an in-memory 
byte array to-be -flushed); To discard already flushed but not included into 
any checkpoint changes, we have three options: a) shared/private state 
ownership and TM-side registry (FLINK-23139); b) TM-side registry only for 
changelog; c) rely on FLINK-24852 (which will likely be needed by FLINK-25395). 
I propose to postpone this decision until we decide on FLINK-25395 and state 
ownership. Note that this only happens if pre-emptive upload is enabled 
(otherwise, state changes are always associated with some checkpoint)
{quote}
I am fine to postpone this decision.
{quote}4. Materialization is independent, but handling its result is 
"synchronized" with checkpointing by using Task mailbox; so it's only the 
writer.truncate() method that should take ongoing checkpoints into account; and 
this is the with the current FS writer.
{quote}
Let's consider this:

New CP
-{-}|{-}-----{-}|{-}---------> (In Mem Log)
Materialization up to (but not complete yet)

1). Materialization triggered

2). CP triggered (the uploading part is not in the task thread)

3). Materialization finished and put truncating action into the mailbox

4). task thread truncate the log

5). checkpoint complete

=====================

Also, if I am understanding correctly, now we put clean-up into three different 
places now:

1). State Cleanup upon checkpoint subsumption

2). In-memory part clean-up upon materialization completes

3). DFS files cleanup (not included in any state) TBD.

Again, would this abstraction general enough to support using other 
implmentation? It is a bit fragile to me.


was (Author: ym):
{quote}1. Correct (as I wrote above in the context of this ticket I mean 
in-memory data only)
2. As I wrote above in the context of this ticket I mean in-memory data only
{quote}
If truncating only means truncating the in-memory part, would this API be 
general enough for other implementations as well? (Like in-memory 
implementation, Kafka based implementation)
{quote}3. In-memory means any in-memory object not needed for checkpointing 
anymore, depending on DSTL implementation (reference to file or an in-memory 
byte array to-be -flushed); To discard already flushed but not included into 
any checkpoint changes, we have three options: a) shared/private state 
ownership and TM-side registry (FLINK-23139); b) TM-side registry only for 
changelog; c) rely on FLINK-24852 (which will likely be needed by FLINK-25395). 
I propose to postpone this decision until we decide on FLINK-25395 and state 
ownership. Note that this only happens if pre-emptive upload is enabled 
(otherwise, state changes are always associated with some checkpoint)
{quote}
I am fine to postpone this decision.
{quote}Materialization is independent, but handling its result is 
"synchronized" with checkpointing by using Task mailbox; so it's only the 
writer.truncate() method that should take ongoing checkpoints into account; and 
this is the with the current FS writer.
{quote}
Let's consider this:

New CP
--|-------|----------> (In Mem Log)
Materialization up to (but not complete yet)

1). Materialization triggered

2). CP triggered (the uploading part is not in the task thread)

3). Materialization finished and put truncating action into the mailbox

4). task thread truncate the log

5). checkpoint complete

=====================

Also, if I am understanding correctly, now we put clean-up into three different 
places now:

1). State Cleanup upon checkpoint subsumption

2). In-memory part clean-up upon materialization completes

3). DFS files cleanup (not included in any state) TBD.

Again, would this abstraction general enough to support using other 
implmentation? It is a bit fragile to me.

> Changelog not truncated on materialization
> ------------------------------------------
>
>                 Key: FLINK-25261
>                 URL: https://issues.apache.org/jira/browse/FLINK-25261
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.15.0
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> [https://github.com/apache/flink/blob/dcc4d43e413b20f70036e73c61d52e2e1c5afee7/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java#L640]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to