This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 66048d1  [FLINK-25987][state/changelog] Replace lastSqn with nextSqn()
66048d1 is described below

commit 66048d11c1b53b5c6b24cdb20a6d4d3348f87026
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Feb 11 16:49:12 2022 +0100

    [FLINK-25987][state/changelog] Replace lastSqn with nextSqn()
    
    The current semantics of ChangelogWriter.lastAppendedSqn doesn't allow
    to distinguish between initial empty and non-empty states.
    This results in wrong SQN passed to truncate and persist methods.
    
    This change solves this by replacing lastAppendedSqn with nextSqn.
    It essentially moves the responsibility to call sqn.next() on 
materialization
    from backend to writer which has the knowledge of whether there were any 
changes or not.
---
 .../flink/changelog/fs/FsStateChangelogWriter.java | 13 +++--------
 .../changelog/fs/ChangelogStorageMetricsTest.java  | 25 ++++++++++++++--------
 .../fs/FsStateChangelogWriterSqnTest.java          |  6 +++---
 .../changelog/fs/FsStateChangelogWriterTest.java   |  3 ++-
 .../state/changelog/StateChangelogWriter.java      |  5 +++--
 .../inmemory/InMemoryStateChangelogWriter.java     |  6 +++---
 .../changelog/ChangelogKeyedStateBackend.java      | 11 +++++-----
 .../changelog/ChangelogStateBackendTestUtils.java  |  4 ++--
 .../state/changelog/StateChangeLoggerTestBase.java |  2 +-
 9 files changed, 38 insertions(+), 37 deletions(-)

diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
index e2b5437..62420de 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
@@ -105,8 +105,6 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
     /** Current {@link SequenceNumber}. */
     private SequenceNumber activeSequenceNumber = INITIAL_SQN;
 
-    private SequenceNumber lastAppendedSequenceNumber = INITIAL_SQN;
-
     /**
      * {@link SequenceNumber} before which changes will NOT be requested, 
exclusive. Increased after
      * materialization.
@@ -173,18 +171,14 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
     }
 
     @Override
-    public SequenceNumber lastAppendedSequenceNumber() {
+    public SequenceNumber nextSequenceNumber() {
         // the returned current sequence number must be able to distinguish 
between the changes
         // appended before and after this call so we need to use the next 
sequence number
         // At the same time, we don't want to increment SQN on each append (to 
avoid too many
         // objects and segments in the resulting file).
         rollover();
-        LOG.trace(
-                "query {} sqn, last: {}, active: {}",
-                logId,
-                lastAppendedSequenceNumber,
-                activeSequenceNumber);
-        return lastAppendedSequenceNumber;
+        LOG.trace("query {} sqn: {}", logId, activeSequenceNumber);
+        return activeSequenceNumber;
     }
 
     @Override
@@ -290,7 +284,6 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
         notUploaded.put(
                 activeSequenceNumber,
                 new StateChangeSet(logId, activeSequenceNumber, 
activeChangeSet));
-        lastAppendedSequenceNumber = activeSequenceNumber;
         activeSequenceNumber = activeSequenceNumber.next();
         LOG.debug("bump active sqn to {}", activeSequenceNumber);
         activeChangeSet = new ArrayList<>();
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
index c09ed12..6093ad9 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
 
 import org.junit.Rule;
@@ -61,8 +62,9 @@ public class ChangelogStorageMetricsTest {
 
             int numUploads = 5;
             for (int i = 0; i < numUploads; i++) {
+                SequenceNumber from = writer.nextSequenceNumber();
                 writer.append(0, new byte[] {0, 1, 2, 3});
-                writer.persist(writer.lastAppendedSequenceNumber()).get();
+                writer.persist(from).get();
             }
             assertEquals(numUploads, metrics.getUploadsCounter().getCount());
             
assertTrue(metrics.getUploadLatenciesNanos().getStatistics().getMin() > 0);
@@ -80,14 +82,16 @@ public class ChangelogStorageMetricsTest {
             FsStateChangelogWriter writer = storage.createWriter("writer", 
EMPTY_KEY_GROUP_RANGE);
 
             // upload single byte to infer header size
+            SequenceNumber from = writer.nextSequenceNumber();
             writer.append(0, new byte[] {0});
-            writer.persist(writer.lastAppendedSequenceNumber()).get();
+            writer.persist(from).get();
             long headerSize = 
metrics.getUploadSizes().getStatistics().getMin() - 1;
 
             byte[] upload = new byte[33];
             for (int i = 0; i < 5; i++) {
+                from = writer.nextSequenceNumber();
                 writer.append(0, upload);
-                writer.persist(writer.lastAppendedSequenceNumber()).get();
+                writer.persist(from).get();
             }
             long expected = upload.length + headerSize;
             assertEquals(expected, 
metrics.getUploadSizes().getStatistics().getMax());
@@ -105,9 +109,10 @@ public class ChangelogStorageMetricsTest {
 
             int numUploads = 5;
             for (int i = 0; i < numUploads; i++) {
+                SequenceNumber from = writer.nextSequenceNumber();
                 writer.append(0, new byte[] {0, 1, 2, 3});
                 try {
-                    writer.persist(writer.lastAppendedSequenceNumber()).get();
+                    writer.persist(from).get();
                 } catch (IOException e) {
                     // ignore
                 }
@@ -148,10 +153,10 @@ public class ChangelogStorageMetricsTest {
             for (int upload = 0; upload < numUploads; upload++) {
                 for (int writer = 0; writer < numWriters; writer++) {
                     // with all thresholds on MAX and manually triggered 
executor, this shouldn't
-                    // cause
-                    // actual uploads
+                    // cause actual uploads
+                    SequenceNumber from = writers[writer].nextSequenceNumber();
                     writers[writer].append(0, new byte[] {0, 1, 2, 3});
-                    
writers[writer].persist(writers[writer].lastAppendedSequenceNumber());
+                    writers[writer].persist(from);
                 }
                 // now the uploads should be grouped and executed at once
                 scheduler.triggerScheduledTasks();
@@ -186,8 +191,9 @@ public class ChangelogStorageMetricsTest {
 
         try {
             for (int upload = 0; upload < numUploads; upload++) {
+                SequenceNumber from = writer.nextSequenceNumber();
                 writer.append(0, new byte[] {0, 1, 2, 3});
-                writer.persist(writer.lastAppendedSequenceNumber()).get();
+                writer.persist(from).get();
             }
             HistogramStatistics histogram = 
metrics.getAttemptsPerUpload().getStatistics();
             assertEquals(maxAttempts, histogram.getMin());
@@ -236,8 +242,9 @@ public class ChangelogStorageMetricsTest {
             FsStateChangelogWriter writer = storage.createWriter("writer", 
EMPTY_KEY_GROUP_RANGE);
             int numUploads = 11;
             for (int i = 0; i < numUploads; i++) {
+                SequenceNumber from = writer.nextSequenceNumber();
                 writer.append(0, new byte[] {0});
-                writer.persist(writer.lastAppendedSequenceNumber());
+                writer.persist(from);
             }
             assertEquals(numUploads, (int) queueSizeGauge.get().getValue());
             scheduler.triggerScheduledTasks();
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
index 447e23b..23217dc 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
@@ -43,10 +43,10 @@ public class FsStateChangelogWriterSqnTest {
     @Parameterized.Parameters(name = "{0}")
     public static List<WriterSqnTestSettings> getSettings() {
         return asList(
-                of(StateChangelogWriter::lastAppendedSequenceNumber, 
"lastAppendedSequenceNumber")
+                of(StateChangelogWriter::nextSequenceNumber, 
"nextSequenceNumber")
                         .withAppendCall(false)
                         .expectIncrement(false),
-                of(StateChangelogWriter::lastAppendedSequenceNumber, 
"lastAppendedSequenceNumber")
+                of(StateChangelogWriter::nextSequenceNumber, 
"nextSequenceNumber")
                         .withAppendCall(true)
                         .expectIncrement(true),
                 of(FsStateChangelogWriterSqnTest::persistAll, "persist")
@@ -146,7 +146,7 @@ public class FsStateChangelogWriterSqnTest {
     }
 
     private static void truncateLast(FsStateChangelogWriter writer) {
-        writer.truncate(writer.lastAppendedSequenceNumber());
+        writer.truncate(writer.nextSequenceNumber());
     }
 
     private static void truncateAll(FsStateChangelogWriter writer) {
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
index c9fc913..0e1622a 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
@@ -223,8 +223,9 @@ public class FsStateChangelogWriterTest {
     }
 
     private SequenceNumber append(FsStateChangelogWriter writer, byte[] bytes) 
throws IOException {
+        SequenceNumber sequenceNumber = writer.nextSequenceNumber();
         writer.append(KEY_GROUP, bytes);
-        return writer.lastAppendedSequenceNumber();
+        return sequenceNumber;
     }
 
     private byte[] getBytes() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
index 7d69f18..48e8844 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
@@ -31,9 +31,10 @@ public interface StateChangelogWriter<Handle extends 
ChangelogStateHandle> exten
     SequenceNumber initialSequenceNumber();
 
     /**
-     * Get {@link SequenceNumber} of the last element added by {@link 
#append(int, byte[]) append}.
+     * Get {@link SequenceNumber} to be used for the next element added by 
{@link #append(int,
+     * byte[]) append}.
      */
-    SequenceNumber lastAppendedSequenceNumber();
+    SequenceNumber nextSequenceNumber();
 
     /** Appends the provided data to this log. No persistency guarantees. */
     void append(int keyGroup, byte[] value) throws IOException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
index 18e4346..19043de 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -61,8 +61,8 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryChang
     public void append(int keyGroup, byte[] value) {
         Preconditions.checkState(!closed, "LogWriter is closed");
         LOG.trace("append, keyGroup={}, {} bytes", keyGroup, value.length);
-        sqn = sqn.next();
         changesByKeyGroup.computeIfAbsent(keyGroup, unused -> new 
TreeMap<>()).put(sqn, value);
+        sqn = sqn.next();
     }
 
     @Override
@@ -71,7 +71,7 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryChang
     }
 
     @Override
-    public SequenceNumber lastAppendedSequenceNumber() {
+    public SequenceNumber nextSequenceNumber() {
         return sqn;
     }
 
@@ -109,7 +109,7 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryChang
                 .filter(map -> !map.isEmpty())
                 .map(SortedMap::firstKey)
                 .min(Comparator.naturalOrder())
-                .orElse(lastAppendedSequenceNumber().next());
+                .orElse(nextSequenceNumber());
     }
 
     @Override
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index 8000050..07e6bf3 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -189,10 +189,9 @@ public class ChangelogKeyedStateBackend<K>
     @Nullable private SequenceNumber lastUploadedFrom;
     /**
      * {@link SequenceNumber} denoting last upload range <b>end</b>, 
exclusive. Updated to {@link
-     * 
org.apache.flink.runtime.state.changelog.StateChangelogWriter#lastAppendedSequenceNumber}
-     * when {@link #snapshot(long, long, CheckpointStreamFactory, 
CheckpointOptions) starting
-     * snapshot}. Used to notify {@link #stateChangelogWriter} about changelog 
ranges that were
-     * confirmed or aborted by JM.
+     * StateChangelogWriter#nextSequenceNumber()} when {@link #snapshot(long, 
long,
+     * CheckpointStreamFactory, CheckpointOptions) starting snapshot}. Used to 
notify {@link
+     * #stateChangelogWriter} about changelog ranges that were confirmed or 
aborted by JM.
      */
     @Nullable private SequenceNumber lastUploadedTo;
 
@@ -374,7 +373,7 @@ public class ChangelogKeyedStateBackend<K>
         // have to split it somehow for the former option, so the latter is 
used.
         lastCheckpointId = checkpointId;
         lastUploadedFrom = changelogSnapshotState.lastMaterializedTo();
-        lastUploadedTo = 
stateChangelogWriter.lastAppendedSequenceNumber().next();
+        lastUploadedTo = stateChangelogWriter.nextSequenceNumber();
 
         LOG.info(
                 "snapshot of {} for checkpoint {}, change range: {}..{}",
@@ -626,7 +625,7 @@ public class ChangelogKeyedStateBackend<K>
      *     SequenceNumber} identifying the latest change in the changelog
      */
     public Optional<MaterializationRunnable> initMaterialization() throws 
Exception {
-        SequenceNumber upTo = 
stateChangelogWriter.lastAppendedSequenceNumber().next();
+        SequenceNumber upTo = stateChangelogWriter.nextSequenceNumber();
         SequenceNumber lastMaterializedTo = 
changelogSnapshotState.lastMaterializedTo();
 
         LOG.info(
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
index c3710dd..cf71b3e 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
@@ -250,11 +250,11 @@ public class ChangelogStateBackendTestUtils {
             PeriodicMaterializationManager periodicMaterializationManager) {
         StateChangelogWriter<? extends ChangelogStateHandle> writer =
                 keyedBackend.getChangelogWriter();
-        SequenceNumber sqnBefore = writer.lastAppendedSequenceNumber();
+        SequenceNumber sqn = writer.nextSequenceNumber();
         periodicMaterializationManager.triggerMaterialization();
         assertTrue(
                 "Materialization didn't truncate the changelog",
-                sqnBefore.compareTo(writer.getLowestSequenceNumber()) <= 0);
+                sqn.compareTo(writer.getLowestSequenceNumber()) <= 0);
     }
 
     public static void testMaterializedRestoreForPriorityQueue(
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
index 2566777..b23b9d5 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
@@ -115,7 +115,7 @@ abstract class StateChangeLoggerTestBase<Namespace> {
         }
 
         @Override
-        public SequenceNumber lastAppendedSequenceNumber() {
+        public SequenceNumber nextSequenceNumber() {
             throw new UnsupportedOperationException();
         }
 

Reply via email to