[
https://issues.apache.org/jira/browse/FLINK-24436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17423195#comment-17423195
]
Yuan Mei commented on FLINK-24436:
----------------------------------
This is because `rollover()` increases the `activeSequenceNumber`
{code:java}
private void rollover() {
if (activeChangeSet.isEmpty()) {
return;
}
notUploaded.put(
activeSequenceNumber,
new StateChangeSet(logId, activeSequenceNumber, activeChangeSet));
activeSequenceNumber = activeSequenceNumber.next();
LOG.debug("bump active sqn to {}", activeSequenceNumber);
activeChangeSet = new ArrayList<>();
activeChangeSetSize = 0;
}
{code}
while
{code}
public SequenceNumber lastAppendedSequenceNumber() {
LOG.debug("query {} sqn: {}", logId, activeSequenceNumber);
SequenceNumber tmp = activeSequenceNumber;
// the returned current sequence number must be able to distinguish between
the changes
// appended before and after this call so we need to use the next sequence
number
// At the same time, we don't want to increment SQN on each append (to
avoid too many
// objects and segments in the resulting file).
rollover();
return tmp;
}
{code}
return the old tmp value first time `lastAppendedSequenceNumber()` is called.
> FsStateChangelogWriter#lastAppendedSequenceNumber return different seq number
> with no writes
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-24436
> URL: https://issues.apache.org/jira/browse/FLINK-24436
> Project: Flink
> Issue Type: Bug
> Reporter: Yuan Mei
> Priority: Major
>
> Test code:
> [https://github.com/apache/flink/pull/16606/commits/3d32e902cee493a984bc052b76dfec984743921f]
> LOG:
> {code:java}
> 2157 [main] INFO org.apache.flink.changelog.fs.FsStateChangelogWriter [] -
> append to 00000000-0000-0000-0000-000000000000: keyGroup=-1 844 bytes
> 2163 [main] INFO org.apache.flink.changelog.fs.FsStateChangelogWriter [] -
> append to 00000000-0000-0000-0000-000000000000: keyGroup=8 17 bytes
> 2163 [main] INFO
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct
> append, last appended to 0
> 2163 [main] INFO org.apache.flink.changelog.fs.FsStateChangelogWriter [] -
> append to 00000000-0000-0000-0000-000000000000: keyGroup=3 17 bytes
> 2163 [main] INFO
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct
> append, last appended to 1
> 2163 [main] INFO
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct
> append, last appended to 2
> 2163 [main] INFO
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct
> append, last appended to 2
> 2163 [main] INFO org.apache.flink.changelog.fs.FsStateChangelogWriter [] -
> append to 00000000-0000-0000-0000-000000000000: keyGroup=3 17 bytes
> 2163 [main] INFO
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct
> append, last appended to 2
> 2163 [main] INFO org.apache.flink.changelog.fs.FsStateChangelogWriter [] -
> append to 00000000-0000-0000-0000-000000000000: keyGroup=5 17 bytes
> 2163 [main] INFO
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct
> append, last appended to 3
> 2163 [main] INFO
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct
> append, last appended to 4
> 2164 [main] INFO
> org.apache.flink.state.changelog.ChangelogStateBackendTestUtils [] - direct
> append, last appended to 4
> {code}
> Problem:
> 1. getLastAppendedTo() does not return the same seq number if no writes.
> 2. Materialization depends on `if
> (upTo.compareTo(changelogSnapshotState.lastMaterializedTo()) > 0)` to decide
> whether really perform materialisation. This will cause some undefined
> behavior if I call getLastAppendedTo() twice
--
This message was sent by Atlassian Jira
(v8.3.4#803005)