This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 3fed74d  [FLINK-26799][state/changelog] fix seek condition in 
StateChangeFormat#read
3fed74d is described below

commit 3fed74d757b34f9537de1b48b0bab7840c86649b
Author: wangfeifan <[email protected]>
AuthorDate: Tue Mar 22 20:11:41 2022 +0800

    [FLINK-26799][state/changelog] fix seek condition in StateChangeFormat#read
---
 .../src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java | 2 +-
 .../runtime/state/changelog/inmemory/StateChangelogStorageTest.java    | 3 ++-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
index ba04f59..ff21493 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
@@ -91,7 +91,7 @@ public class StateChangeFormat
             throws IOException {
         FSDataInputStream stream = handle.openInputStream();
         DataInputViewStreamWrapper input = wrap(stream);
-        if (stream.getPos() != offset) {
+        if (offset != 0) {
             LOG.debug("seek from {} to {}", stream.getPos(), offset);
             input.skipBytesToRead((int) offset);
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
index d7ddf40..4adde98 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
@@ -71,7 +71,7 @@ public class StateChangelogStorageTest<T extends 
ChangelogStateHandle> {
     @Test
     public void testWriteAndRead() throws Exception {
         KeyGroupRange kgRange = KeyGroupRange.of(0, 5);
-        Map<Integer, List<byte[]>> appendsByKeyGroup = 
generateAppends(kgRange, 10, 20);
+        Map<Integer, List<byte[]>> appendsByKeyGroup = 
generateAppends(kgRange, 405, 20);
 
         try (StateChangelogStorage<T> client = getFactory();
                 StateChangelogWriter<T> writer =
@@ -84,6 +84,7 @@ public class StateChangelogStorageTest<T extends 
ChangelogStateHandle> {
                 for (byte[] bytes : appends) {
                     writer.append(group, bytes);
                 }
+                writer.nextSequenceNumber();
             }
 
             T handle = writer.persist(prev).get();

Reply via email to