This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 06b0a98  [FLINK-25492][state/changelog] Fix materialization boundary 
for InMemoryStateChangelogWriter
06b0a98 is described below

commit 06b0a98e4e7e543c96f6560fe007bf61a02f8431
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Jan 28 15:24:59 2022 +0100

    [FLINK-25492][state/changelog] Fix materialization boundary for 
InMemoryStateChangelogWriter
---
 .../flink/changelog/fs/FsStateChangelogWriter.java | 12 +++++--
 .../inmemory/InMemoryStateChangelogWriter.java     |  2 +-
 .../changelog/ChangelogKeyedStateBackend.java      | 12 ++-----
 .../ChangelogDelegateHashMapInMemoryTest.java      | 39 ++++++++++++++++++++++
 4 files changed, 51 insertions(+), 14 deletions(-)

diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
index d8bce45..e2b5437 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
@@ -105,6 +105,8 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
     /** Current {@link SequenceNumber}. */
     private SequenceNumber activeSequenceNumber = INITIAL_SQN;
 
+    private SequenceNumber lastAppendedSequenceNumber = INITIAL_SQN;
+
     /**
      * {@link SequenceNumber} before which changes will NOT be requested, 
exclusive. Increased after
      * materialization.
@@ -172,14 +174,17 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
 
     @Override
     public SequenceNumber lastAppendedSequenceNumber() {
-        LOG.trace("query {} sqn: {}", logId, activeSequenceNumber);
-        SequenceNumber tmp = activeSequenceNumber;
         // the returned current sequence number must be able to distinguish 
between the changes
         // appended before and after this call so we need to use the next 
sequence number
         // At the same time, we don't want to increment SQN on each append (to 
avoid too many
         // objects and segments in the resulting file).
         rollover();
-        return tmp;
+        LOG.trace(
+                "query {} sqn, last: {}, active: {}",
+                logId,
+                lastAppendedSequenceNumber,
+                activeSequenceNumber);
+        return lastAppendedSequenceNumber;
     }
 
     @Override
@@ -285,6 +290,7 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
         notUploaded.put(
                 activeSequenceNumber,
                 new StateChangeSet(logId, activeSequenceNumber, 
activeChangeSet));
+        lastAppendedSequenceNumber = activeSequenceNumber;
         activeSequenceNumber = activeSequenceNumber.next();
         LOG.debug("bump active sqn to {}", activeSequenceNumber);
         activeChangeSet = new ArrayList<>();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
index 4fdf0bf..18e4346 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -109,7 +109,7 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryChang
                 .filter(map -> !map.isEmpty())
                 .map(SortedMap::firstKey)
                 .min(Comparator.naturalOrder())
-                .orElse(initialSequenceNumber());
+                .orElse(lastAppendedSequenceNumber().next());
     }
 
     @Override
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index 6bb710d..ba9484e 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -374,7 +374,7 @@ public class ChangelogKeyedStateBackend<K>
         // have to split it somehow for the former option, so the latter is 
used.
         lastCheckpointId = checkpointId;
         lastUploadedFrom = changelogSnapshotState.lastMaterializedTo();
-        lastUploadedTo = getLastAppendedTo();
+        lastUploadedTo = 
stateChangelogWriter.lastAppendedSequenceNumber().next();
 
         LOG.info(
                 "snapshot of {} for checkpoint {}, change range: {}..{}",
@@ -629,7 +629,7 @@ public class ChangelogKeyedStateBackend<K>
      *     SequenceNumber} identifying the latest change in the changelog
      */
     public Optional<MaterializationRunnable> initMaterialization() throws 
Exception {
-        SequenceNumber upTo = getLastAppendedTo();
+        SequenceNumber upTo = 
stateChangelogWriter.lastAppendedSequenceNumber().next();
         SequenceNumber lastMaterializedTo = 
changelogSnapshotState.lastMaterializedTo();
 
         LOG.info(
@@ -677,14 +677,6 @@ public class ChangelogKeyedStateBackend<K>
         }
     }
 
-    // TODO: Remove after fix FLINK-24436
-    //  FsStateChangelogWriter#lastAppendedSequenceNumber returns different 
seq number
-    //  the first time called vs called after the first time.
-    private SequenceNumber getLastAppendedTo() {
-        stateChangelogWriter.lastAppendedSequenceNumber();
-        return stateChangelogWriter.lastAppendedSequenceNumber();
-    }
-
     /**
      * This method is not thread safe. It should be called either under a lock 
or through task
      * mailbox executor.
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapInMemoryTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapInMemoryTest.java
new file mode 100644
index 0000000..f2c4818
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapInMemoryTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.runtime.state.HashMapStateBackendTest;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import 
org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for {@link ChangelogStateBackend} using {@link 
InMemoryStateChangelogStorage} and
+ * delegating {@link HashMapStateBackendTest}.
+ */
+public class ChangelogDelegateHashMapInMemoryTest extends 
ChangelogDelegateHashMapTest {
+
+    @Rule public final TemporaryFolder temp = new TemporaryFolder();
+
+    protected TestTaskStateManager getTestTaskStateManager() {
+        return TestTaskStateManager.builder().build();
+    }
+}

Reply via email to