[ 
https://issues.apache.org/jira/browse/FLINK-24436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17423195#comment-17423195
 ] 

Yuan Mei commented on FLINK-24436:
----------------------------------

This is because `rollover()` increases the `activeSequenceNumber`

{code:java}
private void rollover() {
    if (activeChangeSet.isEmpty()) {
        return;
    }
    notUploaded.put(
            activeSequenceNumber,
            new StateChangeSet(logId, activeSequenceNumber, activeChangeSet));
    activeSequenceNumber = activeSequenceNumber.next();
    LOG.debug("bump active sqn to {}", activeSequenceNumber);
    activeChangeSet = new ArrayList<>();
    activeChangeSetSize = 0;
}
{code}

while
{code}
public SequenceNumber lastAppendedSequenceNumber() {
    LOG.debug("query {} sqn: {}", logId, activeSequenceNumber);
    SequenceNumber tmp = activeSequenceNumber;
    // the returned current sequence number must be able to distinguish between 
the changes
    // appended before and after this call so we need to use the next sequence 
number
    // At the same time, we don't want to increment SQN on each append (to 
avoid too many
    // objects and segments in the resulting file).
    rollover();
    return tmp;
}
{code}

return the old tmp value first time `lastAppendedSequenceNumber()` is called.

> FsStateChangelogWriter#lastAppendedSequenceNumber return different seq number 
> with no writes
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-24436
>                 URL: https://issues.apache.org/jira/browse/FLINK-24436
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Yuan Mei
>            Priority: Major
>
> Test code: 
> [https://github.com/apache/flink/pull/16606/commits/3d32e902cee493a984bc052b76dfec984743921f]
> LOG:
> {code:java}
> 2157 [main] INFO  org.apache.flink.changelog.fs.FsStateChangelogWriter [] - 
> append to 00000000-0000-0000-0000-000000000000: keyGroup=-1 844 bytes
> 2163 [main] INFO  org.apache.flink.changelog.fs.FsStateChangelogWriter [] - 
> append to 00000000-0000-0000-0000-000000000000: keyGroup=8 17 bytes
> 2163 [main] INFO  
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct 
> append, last appended to 0
> 2163 [main] INFO  org.apache.flink.changelog.fs.FsStateChangelogWriter [] - 
> append to 00000000-0000-0000-0000-000000000000: keyGroup=3 17 bytes
> 2163 [main] INFO  
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct 
> append, last appended to 1
> 2163 [main] INFO  
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct 
> append, last appended to 2
> 2163 [main] INFO  
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct 
> append, last appended to 2
> 2163 [main] INFO  org.apache.flink.changelog.fs.FsStateChangelogWriter [] - 
> append to 00000000-0000-0000-0000-000000000000: keyGroup=3 17 bytes
> 2163 [main] INFO  
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct 
> append, last appended to 2
> 2163 [main] INFO  org.apache.flink.changelog.fs.FsStateChangelogWriter [] - 
> append to 00000000-0000-0000-0000-000000000000: keyGroup=5 17 bytes
> 2163 [main] INFO  
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct 
> append, last appended to 3
> 2163 [main] INFO  
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct 
> append, last appended to 4
> 2164 [main] INFO  
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct 
> append, last appended to 4
> {code}
> Problem:
> 1. getLastAppendedTo() does not return the same seq number if no writes.
> 2. Materialization depends on `if 
> (upTo.compareTo(changelogSnapshotState.lastMaterializedTo()) > 0)` to decide 
> whether really perform materialisation.  This will cause some undefined 
> behavior if I call getLastAppendedTo() twice 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to