This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 06b0a98 [FLINK-25492][state/changelog] Fix materialization boundary
for InMemoryStateChangelogWriter
06b0a98 is described below
commit 06b0a98e4e7e543c96f6560fe007bf61a02f8431
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Jan 28 15:24:59 2022 +0100
[FLINK-25492][state/changelog] Fix materialization boundary for
InMemoryStateChangelogWriter
---
.../flink/changelog/fs/FsStateChangelogWriter.java | 12 +++++--
.../inmemory/InMemoryStateChangelogWriter.java | 2 +-
.../changelog/ChangelogKeyedStateBackend.java | 12 ++-----
.../ChangelogDelegateHashMapInMemoryTest.java | 39 ++++++++++++++++++++++
4 files changed, 51 insertions(+), 14 deletions(-)
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
index d8bce45..e2b5437 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
@@ -105,6 +105,8 @@ class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandl
/** Current {@link SequenceNumber}. */
private SequenceNumber activeSequenceNumber = INITIAL_SQN;
+ private SequenceNumber lastAppendedSequenceNumber = INITIAL_SQN;
+
/**
* {@link SequenceNumber} before which changes will NOT be requested,
exclusive. Increased after
* materialization.
@@ -172,14 +174,17 @@ class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandl
@Override
public SequenceNumber lastAppendedSequenceNumber() {
- LOG.trace("query {} sqn: {}", logId, activeSequenceNumber);
- SequenceNumber tmp = activeSequenceNumber;
// the returned current sequence number must be able to distinguish
between the changes
// appended before and after this call so we need to use the next
sequence number
// At the same time, we don't want to increment SQN on each append (to
avoid too many
// objects and segments in the resulting file).
rollover();
- return tmp;
+ LOG.trace(
+ "query {} sqn, last: {}, active: {}",
+ logId,
+ lastAppendedSequenceNumber,
+ activeSequenceNumber);
+ return lastAppendedSequenceNumber;
}
@Override
@@ -285,6 +290,7 @@ class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandl
notUploaded.put(
activeSequenceNumber,
new StateChangeSet(logId, activeSequenceNumber,
activeChangeSet));
+ lastAppendedSequenceNumber = activeSequenceNumber;
activeSequenceNumber = activeSequenceNumber.next();
LOG.debug("bump active sqn to {}", activeSequenceNumber);
activeChangeSet = new ArrayList<>();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
index 4fdf0bf..18e4346 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -109,7 +109,7 @@ class InMemoryStateChangelogWriter implements
StateChangelogWriter<InMemoryChang
.filter(map -> !map.isEmpty())
.map(SortedMap::firstKey)
.min(Comparator.naturalOrder())
- .orElse(initialSequenceNumber());
+ .orElse(lastAppendedSequenceNumber().next());
}
@Override
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index 6bb710d..ba9484e 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -374,7 +374,7 @@ public class ChangelogKeyedStateBackend<K>
// have to split it somehow for the former option, so the latter is
used.
lastCheckpointId = checkpointId;
lastUploadedFrom = changelogSnapshotState.lastMaterializedTo();
- lastUploadedTo = getLastAppendedTo();
+ lastUploadedTo =
stateChangelogWriter.lastAppendedSequenceNumber().next();
LOG.info(
"snapshot of {} for checkpoint {}, change range: {}..{}",
@@ -629,7 +629,7 @@ public class ChangelogKeyedStateBackend<K>
* SequenceNumber} identifying the latest change in the changelog
*/
public Optional<MaterializationRunnable> initMaterialization() throws
Exception {
- SequenceNumber upTo = getLastAppendedTo();
+ SequenceNumber upTo =
stateChangelogWriter.lastAppendedSequenceNumber().next();
SequenceNumber lastMaterializedTo =
changelogSnapshotState.lastMaterializedTo();
LOG.info(
@@ -677,14 +677,6 @@ public class ChangelogKeyedStateBackend<K>
}
}
- // TODO: Remove after fix FLINK-24436
- // FsStateChangelogWriter#lastAppendedSequenceNumber returns different
seq number
- // the first time called vs called after the first time.
- private SequenceNumber getLastAppendedTo() {
- stateChangelogWriter.lastAppendedSequenceNumber();
- return stateChangelogWriter.lastAppendedSequenceNumber();
- }
-
/**
* This method is not thread safe. It should be called either under a lock
or through task
* mailbox executor.
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapInMemoryTest.java
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapInMemoryTest.java
new file mode 100644
index 0000000..f2c4818
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapInMemoryTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.runtime.state.HashMapStateBackendTest;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import
org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for {@link ChangelogStateBackend} using {@link
InMemoryStateChangelogStorage} and
+ * delegating {@link HashMapStateBackendTest}.
+ */
+public class ChangelogDelegateHashMapInMemoryTest extends
ChangelogDelegateHashMapTest {
+
+ @Rule public final TemporaryFolder temp = new TemporaryFolder();
+
+ protected TestTaskStateManager getTestTaskStateManager() {
+ return TestTaskStateManager.builder().build();
+ }
+}