This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d5e472a [FLINK-26799][state/changelog] fix seek condition in
StateChangeFormat#read
d5e472a is described below
commit d5e472af4f817d343fae9073aad162ee13f08d6a
Author: wangfeifan <[email protected]>
AuthorDate: Tue Mar 22 20:11:41 2022 +0800
[FLINK-26799][state/changelog] fix seek condition in StateChangeFormat#read
---
.../src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java | 2 +-
.../runtime/state/changelog/inmemory/StateChangelogStorageTest.java | 3 ++-
2 files changed, 3 insertions(+), 2 deletions(-)
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
index ba04f59..ff21493 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
@@ -91,7 +91,7 @@ public class StateChangeFormat
throws IOException {
FSDataInputStream stream = handle.openInputStream();
DataInputViewStreamWrapper input = wrap(stream);
- if (stream.getPos() != offset) {
+ if (offset != 0) {
LOG.debug("seek from {} to {}", stream.getPos(), offset);
input.skipBytesToRead((int) offset);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
index 664239e..a130ba7 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
@@ -81,7 +81,7 @@ public class StateChangelogStorageTest<T extends
ChangelogStateHandle> {
@ParameterizedTest(name = "compression = {0}")
public void testWriteAndRead(boolean compression) throws Exception {
KeyGroupRange kgRange = KeyGroupRange.of(0, 5);
- Map<Integer, List<byte[]>> appendsByKeyGroup =
generateAppends(kgRange, 10, 20);
+ Map<Integer, List<byte[]>> appendsByKeyGroup =
generateAppends(kgRange, 405, 20);
try (StateChangelogStorage<T> client = getFactory(compression,
temporaryFolder);
StateChangelogWriter<T> writer =
@@ -94,6 +94,7 @@ public class StateChangelogStorageTest<T extends
ChangelogStateHandle> {
for (byte[] bytes : appends) {
writer.append(group, bytes);
}
+ writer.nextSequenceNumber();
}
T handle = writer.persist(prev).get();