[ 
https://issues.apache.org/jira/browse/FLINK-26799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511760#comment-17511760
 ] 

Feifan Wang edited comment on FLINK-26799 at 3/24/22, 10:52 AM:
----------------------------------------------------------------

Thanks very much for reply [~roman] .

StateChangelogStorageTest.testWriteAndRead is not enough to find the problem. 
But with a little modification, the problem can be found :
 # change the keyLen from 10 to {color:#de350b}405{color}
 # {color:#172b4d}call writer.nextSequenceNumber(){color} after dealt with 
every entry of appendsByKeyGroup

 

Complete StateChangelogStorageTest.testWriteAndRead with above modification is :
{code:java}
@MethodSource("parameters")
@ParameterizedTest(name = "compression = {0}")
public void testWriteAndRead(boolean compression) throws Exception {
    KeyGroupRange kgRange = KeyGroupRange.of(0, 5);
    Map<Integer, List<byte[]>> appendsByKeyGroup = generateAppends(kgRange, 
405, 20);

    try (StateChangelogStorage<T> client = getFactory(compression, 
temporaryFolder);
            StateChangelogWriter<T> writer =
                    client.createWriter(
                            new OperatorID().toString(), kgRange, new 
SyncMailboxExecutor())) {
        SequenceNumber prev = writer.initialSequenceNumber();
        for (Map.Entry<Integer, List<byte[]>> entry : 
appendsByKeyGroup.entrySet()) {
            Integer group = entry.getKey();
            List<byte[]> appends = entry.getValue();
            for (byte[] bytes : appends) {
                writer.append(group, bytes);
            }
            writer.nextSequenceNumber();
        }

        T handle = writer.persist(prev).get();
        StateChangelogHandleReader<T> reader = client.createReader();

        assertByteMapsEqual(appendsByKeyGroup, extract(handle, reader));
    }
}{code}
You can run FsStateChangelogStorageTest after above modification for 
reproducing this problem.


was (Author: feifan wang):
Thanks very much for reply [~roman] .

StateChangelogStorageTest.testWriteAndRead is not enough to find the problem. 
But with a little modification, the problem can be found :
 # change the keyLen from 10 to {color:#de350b}405{color}
 # {color:#172b4d}call writer.nextSequenceNumber(){color} after dealt with 
every entry of appendsByKeyGroup

 

Complete StateChangelogStorageTest.testWriteAndRead with above modification is :
{code:java}
@MethodSource("parameters")
@ParameterizedTest(name = "compression = {0}")
public void testWriteAndRead(boolean compression) throws Exception {
    KeyGroupRange kgRange = KeyGroupRange.of(0, 5);
    Map<Integer, List<byte[]>> appendsByKeyGroup = generateAppends(kgRange, 
405, 20);

    try (StateChangelogStorage<T> client = getFactory(compression, 
temporaryFolder);
            StateChangelogWriter<T> writer =
                    client.createWriter(
                            new OperatorID().toString(), kgRange, new 
SyncMailboxExecutor())) {
        SequenceNumber prev = writer.initialSequenceNumber();
        for (Map.Entry<Integer, List<byte[]>> entry : 
appendsByKeyGroup.entrySet()) {
            Integer group = entry.getKey();
            List<byte[]> appends = entry.getValue();
            for (byte[] bytes : appends) {
                writer.append(group, bytes);
            }
            writer.nextSequenceNumber();
        }

        T handle = writer.persist(prev).get();
        StateChangelogHandleReader<T> reader = client.createReader();

        assertByteMapsEqual(appendsByKeyGroup, extract(handle, reader));
    }
}
{code}
 

You can run FsStateChangelogStorageTest after above modification for 
reproducing this problem.

> StateChangeFormat#read not seek to offset correctly
> ---------------------------------------------------
>
>                 Key: FLINK-26799
>                 URL: https://issues.apache.org/jira/browse/FLINK-26799
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>            Reporter: Feifan Wang
>            Priority: Major
>         Attachments: image-2022-03-24-18-12-09-742.png
>
>
> StateChangeFormat#read must seek to offset before read, current implement as 
> follows :
>  
> {code:java}
> FSDataInputStream stream = handle.openInputStream();
> DataInputViewStreamWrapper input = wrap(stream);
> if (stream.getPos() != offset) {
>     LOG.debug("seek from {} to {}", stream.getPos(), offset);
>     input.skipBytesToRead((int) offset);
> }{code}
> But the if condition is incorrect, stream.getPos() return the position of 
> underlying stream which is different from position of input.
> By the way, because of wrapped by BufferedInputStream, position of underlying 
> stream always at n*bufferSize or the end of file. 
> Actually, input is aways at position 0 at beginning, so I think we can seek 
> to the offset directly.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to