[ 
https://issues.apache.org/jira/browse/FLINK-26799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511760#comment-17511760
 ] 

Feifan Wang edited comment on FLINK-26799 at 3/24/22, 10:26 AM:
----------------------------------------------------------------

Thanks very much for reply [~roman] .

StateChangelogStorageTest.testWriteAndRead is not enough to find the problem. 
But with a little modification, the problem can be found :
 # change the keyLen from 10 to {color:#de350b}405{color}
 # call {color:#de350b}writer.nextSequenceNumber(){color:#172b4d} after dealt 
with every entry of {color}{color}appendsByKeyGroup

Complete StateChangelogStorageTest.testWriteAndRead with above modification is :
{code:java}
@MethodSource("parameters")
@ParameterizedTest(name = "compression = {0}")
public void testWriteAndRead(boolean compression) throws Exception {
    KeyGroupRange kgRange = KeyGroupRange.of(0, 5);
    Map<Integer, List<byte[]>> appendsByKeyGroup = generateAppends(kgRange, 
405, 20);

    try (StateChangelogStorage<T> client = getFactory(compression, 
temporaryFolder);
            StateChangelogWriter<T> writer =
                    client.createWriter(
                            new OperatorID().toString(), kgRange, new 
SyncMailboxExecutor())) {
        SequenceNumber prev = writer.initialSequenceNumber();
        for (Map.Entry<Integer, List<byte[]>> entry : 
appendsByKeyGroup.entrySet()) {
            Integer group = entry.getKey();
            List<byte[]> appends = entry.getValue();
            for (byte[] bytes : appends) {
                writer.append(group, bytes);
            }
            writer.nextSequenceNumber();
        }

        T handle = writer.persist(prev).get();
        StateChangelogHandleReader<T> reader = client.createReader();

        assertByteMapsEqual(appendsByKeyGroup, extract(handle, reader));
    }
} {code}
You can run {color:#de350b}FsStateChangelogStorageTest {color:#172b4d}after 
above modification for reproducing this problem.{color}{color}


was (Author: feifan wang):
Thanks very much for reply [~roman] .

StateChangelogStorageTest.testWriteAndRead is not enough to find the problem. 
But with a little modification, the problem can be found :
 # change the keyLen from 10 to {color:#de350b}405{color}
 # call {color:#de350b}writer.nextSequenceNumber() {color:#172b4d}after dealt 
with every entry of {color}{color}{color:#172b4d}appendsByKeyGroup{color}

Complete StateChangelogStorageTest.testWriteAndRead with above modification is :
{code:java}
@MethodSource("parameters")
@ParameterizedTest(name = "compression = {0}")
public void testWriteAndRead(boolean compression) throws Exception {
    KeyGroupRange kgRange = KeyGroupRange.of(0, 5);
    Map<Integer, List<byte[]>> appendsByKeyGroup = generateAppends(kgRange, 
405, 20);

    try (StateChangelogStorage<T> client = getFactory(compression, 
temporaryFolder);
            StateChangelogWriter<T> writer =
                    client.createWriter(
                            new OperatorID().toString(), kgRange, new 
SyncMailboxExecutor())) {
        SequenceNumber prev = writer.initialSequenceNumber();
        for (Map.Entry<Integer, List<byte[]>> entry : 
appendsByKeyGroup.entrySet()) {
            Integer group = entry.getKey();
            List<byte[]> appends = entry.getValue();
            for (byte[] bytes : appends) {
                writer.append(group, bytes);
            }
            writer.nextSequenceNumber();
        }

        T handle = writer.persist(prev).get();
        StateChangelogHandleReader<T> reader = client.createReader();

        assertByteMapsEqual(appendsByKeyGroup, extract(handle, reader));
    }
} {code}
You can run {color:#de350b}FsStateChangelogStorageTest after above modification 
for reproducing this problem.{color}

> StateChangeFormat#read not seek to offset correctly
> ---------------------------------------------------
>
>                 Key: FLINK-26799
>                 URL: https://issues.apache.org/jira/browse/FLINK-26799
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>            Reporter: Feifan Wang
>            Priority: Major
>         Attachments: image-2022-03-24-18-12-09-742.png
>
>
> StateChangeFormat#read must seek to offset before read, current implement as 
> follows :
>  
> {code:java}
> FSDataInputStream stream = handle.openInputStream();
> DataInputViewStreamWrapper input = wrap(stream);
> if (stream.getPos() != offset) {
>     LOG.debug("seek from {} to {}", stream.getPos(), offset);
>     input.skipBytesToRead((int) offset);
> }{code}
> But the if condition is incorrect, stream.getPos() return the position of 
> underlying stream which is different from position of input.
> By the way, because of wrapped by BufferedInputStream, position of underlying 
> stream always at n*bufferSize or the end of file. 
> Actually, input is aways at position 0 at beginning, so I think we can seek 
> to the offset directly.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to