This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 3fed74d [FLINK-26799][state/changelog] fix seek condition in
StateChangeFormat#read
3fed74d is described below
commit 3fed74d757b34f9537de1b48b0bab7840c86649b
Author: wangfeifan <[email protected]>
AuthorDate: Tue Mar 22 20:11:41 2022 +0800
[FLINK-26799][state/changelog] fix seek condition in StateChangeFormat#read
---
.../src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java | 2 +-
.../runtime/state/changelog/inmemory/StateChangelogStorageTest.java | 3 ++-
2 files changed, 3 insertions(+), 2 deletions(-)
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
index ba04f59..ff21493 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
@@ -91,7 +91,7 @@ public class StateChangeFormat
throws IOException {
FSDataInputStream stream = handle.openInputStream();
DataInputViewStreamWrapper input = wrap(stream);
- if (stream.getPos() != offset) {
+ if (offset != 0) {
LOG.debug("seek from {} to {}", stream.getPos(), offset);
input.skipBytesToRead((int) offset);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
index d7ddf40..4adde98 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
@@ -71,7 +71,7 @@ public class StateChangelogStorageTest<T extends
ChangelogStateHandle> {
@Test
public void testWriteAndRead() throws Exception {
KeyGroupRange kgRange = KeyGroupRange.of(0, 5);
- Map<Integer, List<byte[]>> appendsByKeyGroup =
generateAppends(kgRange, 10, 20);
+ Map<Integer, List<byte[]>> appendsByKeyGroup =
generateAppends(kgRange, 405, 20);
try (StateChangelogStorage<T> client = getFactory();
StateChangelogWriter<T> writer =
@@ -84,6 +84,7 @@ public class StateChangelogStorageTest<T extends
ChangelogStateHandle> {
for (byte[] bytes : appends) {
writer.append(group, bytes);
}
+ writer.nextSequenceNumber();
}
T handle = writer.persist(prev).get();