[
https://issues.apache.org/jira/browse/FLINK-26799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511760#comment-17511760
]
Feifan Wang edited comment on FLINK-26799 at 3/24/22, 10:26 AM:
----------------------------------------------------------------
Thanks very much for reply [~roman] .
StateChangelogStorageTest.testWriteAndRead is not enough to find the problem.
But with a little modification, the problem can be found :
# change the keyLen from 10 to {color:#de350b}405{color}
# call {color:#de350b}writer.nextSequenceNumber(){color:#172b4d} after dealt
with every entry of {color}{color}appendsByKeyGroup
Complete StateChangelogStorageTest.testWriteAndRead with above modification is :
{code:java}
@MethodSource("parameters")
@ParameterizedTest(name = "compression = {0}")
public void testWriteAndRead(boolean compression) throws Exception {
KeyGroupRange kgRange = KeyGroupRange.of(0, 5);
Map<Integer, List<byte[]>> appendsByKeyGroup = generateAppends(kgRange,
405, 20);
try (StateChangelogStorage<T> client = getFactory(compression,
temporaryFolder);
StateChangelogWriter<T> writer =
client.createWriter(
new OperatorID().toString(), kgRange, new
SyncMailboxExecutor())) {
SequenceNumber prev = writer.initialSequenceNumber();
for (Map.Entry<Integer, List<byte[]>> entry :
appendsByKeyGroup.entrySet()) {
Integer group = entry.getKey();
List<byte[]> appends = entry.getValue();
for (byte[] bytes : appends) {
writer.append(group, bytes);
}
writer.nextSequenceNumber();
}
T handle = writer.persist(prev).get();
StateChangelogHandleReader<T> reader = client.createReader();
assertByteMapsEqual(appendsByKeyGroup, extract(handle, reader));
}
} {code}
You can run {color:#de350b}FsStateChangelogStorageTest {color:#172b4d}after
above modification for reproducing this problem.{color}{color}
was (Author: feifan wang):
Thanks very much for reply [~roman] .
StateChangelogStorageTest.testWriteAndRead is not enough to find the problem.
But with a little modification, the problem can be found :
# change the keyLen from 10 to {color:#de350b}405{color}
# call {color:#de350b}writer.nextSequenceNumber() {color:#172b4d}after dealt
with every entry of {color}{color}{color:#172b4d}appendsByKeyGroup{color}
Complete StateChangelogStorageTest.testWriteAndRead with above modification is :
{code:java}
@MethodSource("parameters")
@ParameterizedTest(name = "compression = {0}")
public void testWriteAndRead(boolean compression) throws Exception {
KeyGroupRange kgRange = KeyGroupRange.of(0, 5);
Map<Integer, List<byte[]>> appendsByKeyGroup = generateAppends(kgRange,
405, 20);
try (StateChangelogStorage<T> client = getFactory(compression,
temporaryFolder);
StateChangelogWriter<T> writer =
client.createWriter(
new OperatorID().toString(), kgRange, new
SyncMailboxExecutor())) {
SequenceNumber prev = writer.initialSequenceNumber();
for (Map.Entry<Integer, List<byte[]>> entry :
appendsByKeyGroup.entrySet()) {
Integer group = entry.getKey();
List<byte[]> appends = entry.getValue();
for (byte[] bytes : appends) {
writer.append(group, bytes);
}
writer.nextSequenceNumber();
}
T handle = writer.persist(prev).get();
StateChangelogHandleReader<T> reader = client.createReader();
assertByteMapsEqual(appendsByKeyGroup, extract(handle, reader));
}
} {code}
You can run {color:#de350b}FsStateChangelogStorageTest after above modification
for reproducing this problem.{color}
> StateChangeFormat#read not seek to offset correctly
> ---------------------------------------------------
>
> Key: FLINK-26799
> URL: https://issues.apache.org/jira/browse/FLINK-26799
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Reporter: Feifan Wang
> Priority: Major
> Attachments: image-2022-03-24-18-12-09-742.png
>
>
> StateChangeFormat#read must seek to offset before read, current implement as
> follows :
>
> {code:java}
> FSDataInputStream stream = handle.openInputStream();
> DataInputViewStreamWrapper input = wrap(stream);
> if (stream.getPos() != offset) {
> LOG.debug("seek from {} to {}", stream.getPos(), offset);
> input.skipBytesToRead((int) offset);
> }{code}
> But the if condition is incorrect, stream.getPos() return the position of
> underlying stream which is different from position of input.
> By the way, because of wrapped by BufferedInputStream, position of underlying
> stream always at n*bufferSize or the end of file.
> Actually, input is aways at position 0 at beginning, so I think we can seek
> to the offset directly.
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)