This is an automated email from the ASF dual-hosted git repository.
leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c23a3002b83 [FLINK-30863][state/changelog] Register local recovery
files of changelog before notifyCheckpointComplete()
c23a3002b83 is described below
commit c23a3002b837ecffa2d561a405c17c979c814c6e
Author: Yanfei Lei <[email protected]>
AuthorDate: Tue Aug 29 16:37:16 2023 +0800
[FLINK-30863][state/changelog] Register local recovery files of changelog
before notifyCheckpointComplete()
This closes https://github.com/apache/flink/pull/21822
---
.../fs/DuplicatingStateChangeFsUploader.java | 12 +-
.../changelog/fs/FsStateChangelogStorage.java | 2 +
.../flink/changelog/fs/FsStateChangelogWriter.java | 48 ++--
.../changelog/fs/ChangelogStorageMetricsTest.java | 16 +-
.../fs/DiscardRecordableStateChangeUploader.java | 3 +-
.../fs/FsStateChangelogWriterSqnTest.java | 2 +-
.../changelog/fs/FsStateChangelogWriterTest.java | 263 +++++++++++++++++++--
.../state/changelog/LocalChangelogRegistry.java | 15 +-
.../changelog/LocalChangelogRegistryImpl.java | 26 +-
.../state/changelog/StateChangelogWriter.java | 11 +-
.../inmemory/InMemoryStateChangelogWriter.java | 5 +-
.../runtime/state/TestLocalRecoveryConfig.java | 4 +
.../changelog/LocalChangelogRegistryTest.java | 2 +-
.../inmemory/StateChangelogStorageTest.java | 2 +-
.../changelog/ChangelogKeyedStateBackend.java | 2 +-
.../state/changelog/ChangelogTruncateHelper.java | 1 -
.../state/changelog/StateChangeLoggerTestBase.java | 6 +-
17 files changed, 319 insertions(+), 101 deletions(-)
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
index cf99ed17683..50db749b973 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.ChangelogTaskLocalStateStore;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.slf4j.Logger;
@@ -51,14 +52,13 @@ import static
org.apache.flink.runtime.state.ChangelogTaskLocalStateStore.getLoc
* <li>Store the meta of files into {@link ChangelogTaskLocalStateStore}
by
* AsyncCheckpointRunnable#reportCompletedSnapshotStates().
* <li>Pass control of the file to {@link
LocalChangelogRegistry#register} when
- * ChangelogKeyedStateBackend#notifyCheckpointComplete() , files of
the previous
- * checkpoint will be deleted by {@link
LocalChangelogRegistry#discardUpToCheckpoint} at
- * the same time.
+ * FsStateChangelogWriter#persist , files of the previous checkpoint
will be deleted by
+ * {@link LocalChangelogRegistry#discardUpToCheckpoint} when the
checkpoint is confirmed.
* <li>When ChangelogTruncateHelper#materialized() or
* ChangelogTruncateHelper#checkpointSubsumed() is called, {@link
- * TaskChangelogRegistry#notUsed} is responsible for deleting local
files.
- * <li>When one checkpoint is aborted, the dstl files of this checkpoint
will be deleted by
- * {@link LocalChangelogRegistry#prune} in {@link
FsStateChangelogWriter#reset}.
+ * TaskChangelogRegistry#release} is responsible for deleting local
files.
+ * <li>When one checkpoint is aborted, all accumulated local dstl files
will be deleted at
+ * once.
* </ol>
*/
public class DuplicatingStateChangeFsUploader extends
AbstractStateChangeFsUploader {
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
index 5ba18f701ea..1d4668cb8e8 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
@@ -32,6 +32,7 @@ import
org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
import org.apache.flink.runtime.state.changelog.LocalChangelogRegistryImpl;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
+import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -167,6 +168,7 @@ public class FsStateChangelogStorage extends
FsStateChangelogStorageForRecovery
@Override
public void close() throws Exception {
uploader.close();
+ IOUtils.closeQuietly(localChangelogRegistry);
}
@Override
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
index aebb9adf58f..f0a4b21646e 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
@@ -63,9 +63,9 @@ import static org.apache.flink.util.Preconditions.checkState;
* thread synchronization); {@link SequenceNumber} is not changed.
*
* <p>However, if they exceed {@link #preEmptivePersistThresholdInBytes} then
{@link
- * #persistInternal(SequenceNumber) persist} is called.
+ * #persistInternal(SequenceNumber, long) persist} is called.
*
- * <p>On {@link #persist(SequenceNumber) persist}, accumulated changes are
sent to the {@link
+ * <p>On {@link #persist(SequenceNumber, long) persist}, accumulated changes
are sent to the {@link
* StateChangeUploadScheduler} as an immutable {@link
StateChangeUploadScheduler.UploadTask task}.
* An {@link FsStateChangelogWriter.UploadCompletionListener upload listener}
is also registered.
* Upon notification it updates the Writer local state (for future persist
calls) and completes the
@@ -90,6 +90,7 @@ import static org.apache.flink.util.Preconditions.checkState;
@NotThreadSafe
class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandleStreamImpl> {
private static final Logger LOG =
LoggerFactory.getLogger(FsStateChangelogWriter.class);
+ private static final long DUMMY_PERSIST_CHECKPOINT = -1L;
private static final SequenceNumber INITIAL_SQN = SequenceNumber.of(0L);
private final UUID logId;
@@ -207,13 +208,13 @@ class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandl
@Override
public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>
persist(
- SequenceNumber from) throws IOException {
+ SequenceNumber from, long checkpointId) throws IOException {
LOG.debug(
"persist {} starting from sqn {} (incl.), active sqn: {}",
logId,
from,
activeSequenceNumber);
- return persistInternal(from);
+ return persistInternal(from, checkpointId);
}
private void preEmptiveFlushIfNeeded(byte[] value) throws IOException {
@@ -222,17 +223,31 @@ class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandl
LOG.debug(
"pre-emptively flush {}MB of appended changes to the
common store",
activeChangeSetSize / 1024 / 1024);
- persistInternal(notUploaded.isEmpty() ? activeSequenceNumber :
notUploaded.firstKey());
+ persistInternal(
+ notUploaded.isEmpty() ? activeSequenceNumber :
notUploaded.firstKey(),
+ DUMMY_PERSIST_CHECKPOINT);
}
}
private CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>
persistInternal(
- SequenceNumber from) throws IOException {
+ SequenceNumber from, long checkpointId) throws IOException {
ensureCanPersist(from);
rollover();
Map<SequenceNumber, StateChangeSet> toUpload =
drainTailMap(notUploaded, from);
NavigableMap<SequenceNumber, UploadResult> readyToReturn =
uploaded.tailMap(from, true);
- LOG.debug("collected readyToReturn: {}, toUpload: {}", readyToReturn,
toUpload);
+ LOG.debug(
+ "collected readyToReturn: {}, toUpload: {}, checkpointId: {}.",
+ readyToReturn,
+ toUpload,
+ checkpointId);
+
+ if (checkpointId != DUMMY_PERSIST_CHECKPOINT) {
+ for (UploadResult uploadResult : readyToReturn.values()) {
+ if (uploadResult.localStreamHandle != null) {
+
localChangelogRegistry.register(uploadResult.localStreamHandle, checkpointId);
+ }
+ }
+ }
SequenceNumberRange range = SequenceNumberRange.generic(from,
activeSequenceNumber);
if (range.size() == readyToReturn.size()) {
@@ -248,7 +263,7 @@ class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandl
UploadTask uploadTask =
new UploadTask(
toUpload.values(),
- this::handleUploadSuccess,
+ uploadResults ->
handleUploadSuccess(uploadResults, checkpointId),
this::handleUploadFailure);
uploader.upload(uploadTask);
}
@@ -276,7 +291,7 @@ class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandl
"handleUploadFailure");
}
- private void handleUploadSuccess(List<UploadResult> results) {
+ private void handleUploadSuccess(List<UploadResult> results, long
checkpointId) {
mailboxExecutor.execute(
() -> {
if (closed) {
@@ -287,6 +302,12 @@ class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandl
} else {
uploadCompletionListeners.removeIf(listener ->
listener.onSuccess(results));
for (UploadResult result : results) {
+ if (checkpointId != DUMMY_PERSIST_CHECKPOINT) {
+ if (result.localStreamHandle != null) {
+ localChangelogRegistry.register(
+ result.localStreamHandle,
checkpointId);
+ }
+ }
SequenceNumber resultSqn = result.sequenceNumber;
if (resultSqn.compareTo(lowestSequenceNumber) >= 0
&&
resultSqn.compareTo(highestSequenceNumber) < 0) {
@@ -373,19 +394,14 @@ class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandl
.forEach(
localHandle -> {
changelogRegistry.stopTracking(localHandle);
- localChangelogRegistry.register(localHandle,
checkpointId);
});
localChangelogRegistry.discardUpToCheckpoint(checkpointId);
}
- @Override
- public void subsume(long checkpointId) {
- localChangelogRegistry.discardUpToCheckpoint(checkpointId);
- }
-
@Override
public void reset(SequenceNumber from, SequenceNumber to, long
checkpointId) {
- localChangelogRegistry.prune(checkpointId);
+ // delete all accumulated local dstl files when abort
+ localChangelogRegistry.discardUpToCheckpoint(checkpointId + 1);
}
private SnapshotResult<ChangelogStateHandleStreamImpl> buildSnapshotResult(
diff --git
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
index ebf99c338fd..97db57f6547 100644
---
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
+++
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
@@ -78,7 +78,7 @@ public class ChangelogStorageMetricsTest {
for (int i = 0; i < numUploads; i++) {
SequenceNumber from = writer.nextSequenceNumber();
writer.append(0, new byte[] {0, 1, 2, 3});
- writer.persist(from).get();
+ writer.persist(from, 1L).get();
}
assertThat(metrics.getUploadsCounter().getCount()).isEqualTo(numUploads);
assertThat(metrics.getUploadLatenciesNanos().getStatistics().getMin()).isGreaterThan(0);
@@ -104,14 +104,14 @@ public class ChangelogStorageMetricsTest {
// upload single byte to infer header size
SequenceNumber from = writer.nextSequenceNumber();
writer.append(0, new byte[] {0});
- writer.persist(from).get();
+ writer.persist(from, 1L).get();
long headerSize =
metrics.getUploadSizes().getStatistics().getMin() - 1;
byte[] upload = new byte[33];
for (int i = 0; i < 5; i++) {
from = writer.nextSequenceNumber();
writer.append(0, upload);
- writer.persist(from).get();
+ writer.persist(from, 1L).get();
}
long expected = upload.length + headerSize;
assertThat(metrics.getUploadSizes().getStatistics().getMax()).isEqualTo(expected);
@@ -140,7 +140,7 @@ public class ChangelogStorageMetricsTest {
SequenceNumber from = writer.nextSequenceNumber();
writer.append(0, new byte[] {0, 1, 2, 3});
try {
- writer.persist(from).get();
+ writer.persist(from, 1L).get();
} catch (IOException e) {
// ignore
}
@@ -201,7 +201,7 @@ public class ChangelogStorageMetricsTest {
// cause actual uploads
SequenceNumber from = writers[writer].nextSequenceNumber();
writers[writer].append(0, new byte[] {0, 1, 2, 3});
- writers[writer].persist(from);
+ writers[writer].persist(from, 1L);
}
// now the uploads should be grouped and executed at once
scheduler.triggerScheduledTasks();
@@ -248,7 +248,7 @@ public class ChangelogStorageMetricsTest {
for (int upload = 0; upload < numUploads; upload++) {
SequenceNumber from = writer.nextSequenceNumber();
writer.append(0, new byte[] {0, 1, 2, 3});
- writer.persist(from).get();
+ writer.persist(from, 1L).get();
}
} finally {
storage.close();
@@ -293,7 +293,7 @@ public class ChangelogStorageMetricsTest {
for (int upload = 0; upload < numUploads; upload++) {
SequenceNumber from = writer.nextSequenceNumber();
writer.append(0, new byte[] {0, 1, 2, 3});
- writer.persist(from).get();
+ writer.persist(from, 1L).get();
}
} finally {
storage.close();
@@ -359,7 +359,7 @@ public class ChangelogStorageMetricsTest {
for (int i = 0; i < numUploads; i++) {
SequenceNumber from = writer.nextSequenceNumber();
writer.append(0, new byte[] {0});
- writer.persist(from);
+ writer.persist(from, 1L);
}
assertThat((int)
queueSizeGauge.get().getValue()).isEqualTo(numUploads);
scheduler.triggerScheduledTasks();
diff --git
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/DiscardRecordableStateChangeUploader.java
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/DiscardRecordableStateChangeUploader.java
index 90199f304f5..d263c751b4e 100644
---
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/DiscardRecordableStateChangeUploader.java
+++
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/DiscardRecordableStateChangeUploader.java
@@ -55,8 +55,9 @@ public class DiscardRecordableStateChangeUploader implements
StateChangeUploader
// fake StreamStateHandle without data, just for discarding records
StreamStateHandle handle = new TestingStreamStateHandle();
+ StreamStateHandle localHandle = new TestingStreamStateHandle();
changelogRegistry.startTracking(handle, numOfChangeSets);
- return new UploadTasksResult(tasksOffsets, handle);
+ return new UploadTasksResult(tasksOffsets, handle, localHandle);
}
public boolean isDiscarded(StreamStateHandle handle) {
diff --git
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
index 4ea8f665002..b3beabb6ae2 100644
---
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
+++
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
@@ -153,6 +153,6 @@ public class FsStateChangelogWriterSqnTest {
}
private static void persistAll(FsStateChangelogWriter writer) throws
IOException {
- writer.persist(writer.initialSequenceNumber());
+ writer.persist(writer.initialSequenceNumber(), 1L);
}
}
diff --git
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
index f5be2ad2deb..12f1df85d27 100644
---
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
+++
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
+import org.apache.flink.runtime.state.changelog.LocalChangelogRegistryImpl;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.BiConsumerWithException;
@@ -68,7 +69,7 @@ class FsStateChangelogWriterTest {
SequenceNumber sqn = append(writer, bytes);
assertSubmittedOnly(uploader, bytes);
uploader.reset();
- writer.persist(sqn);
+ writer.persist(sqn, 1L);
assertNoUpload(uploader, "changes should have been
pre-uploaded");
});
}
@@ -79,7 +80,7 @@ class FsStateChangelogWriterTest {
(writer, uploader) -> {
byte[] bytes = getBytes();
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
- writer.persist(append(writer, bytes));
+ writer.persist(append(writer, bytes), 1L);
assertSubmittedOnly(uploader, bytes);
uploader.completeUpload();
assertThat(
@@ -100,11 +101,11 @@ class FsStateChangelogWriterTest {
(writer, uploader) -> {
byte[] bytes = getBytes();
SequenceNumber sqn = append(writer, bytes);
- writer.persist(sqn);
+ writer.persist(sqn, 1L);
uploader.completeUpload();
uploader.reset();
writer.confirm(sqn, writer.nextSequenceNumber(), 1L);
- writer.persist(sqn);
+ writer.persist(sqn, 2L);
assertNoUpload(uploader, "confirmed changes shouldn't be
re-uploaded");
});
}
@@ -134,11 +135,12 @@ class FsStateChangelogWriterTest {
writer.append(KEY_GROUP, getBytes(10)); // sqn: 0
+ long checkpointId = 1L;
// checkpoint 1 trigger
SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
- writer.persist(initialSqn);
+ writer.persist(initialSqn, checkpointId);
uploadScheduler.scheduleAll(); // checkpoint 1 completed
- writer.confirm(initialSqn, checkpoint1sqn, 1);
+ writer.confirm(initialSqn, checkpoint1sqn, checkpointId);
writer.append(KEY_GROUP, getBytes(10)); // sqn: 1
@@ -150,9 +152,9 @@ class FsStateChangelogWriterTest {
// materialization 1 completed
// checkpoint 2 trigger
SequenceNumber checkpoint2sqn = writer.nextSequenceNumber();
- writer.persist(materializationSqn);
+ writer.persist(materializationSqn, ++checkpointId);
uploadScheduler.scheduleAll(); // checkpoint 2 completed
- writer.confirm(materializationSqn, checkpoint2sqn, 2);
+ writer.confirm(materializationSqn, checkpoint2sqn, checkpointId);
// checkpoint 1 subsumed
writer.truncate(
@@ -164,9 +166,9 @@ class FsStateChangelogWriterTest {
// checkpoint 3 trigger
SequenceNumber checkpoint3sqn = writer.nextSequenceNumber();
- writer.persist(materializationSqn);
+ writer.persist(materializationSqn, ++checkpointId);
uploadScheduler.scheduleAll(); // checkpoint 3 completed
- writer.confirm(materializationSqn, checkpoint3sqn, 3);
+ writer.confirm(materializationSqn, checkpoint3sqn, checkpointId);
// trigger pre-emptive upload
writer.append(KEY_GROUP, getBytes(100)); // sqn: 4
@@ -181,9 +183,9 @@ class FsStateChangelogWriterTest {
// checkpoint 4 trigger
SequenceNumber checkpoint4sqn = writer.nextSequenceNumber();
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>
future =
- writer.persist(materializationSqn);
+ writer.persist(materializationSqn, ++checkpointId);
uploadScheduler.scheduleAll(); // checkpoint 4 completed
- writer.confirm(materializationSqn, checkpoint4sqn, 4);
+ writer.confirm(materializationSqn, checkpoint4sqn, checkpointId);
SnapshotResult<ChangelogStateHandleStreamImpl> result =
future.get();
ChangelogStateHandleStreamImpl resultHandle =
result.getJobManagerOwnedSnapshot();
@@ -225,7 +227,7 @@ class FsStateChangelogWriterTest {
// checkpoint 1 trigger
SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>
future =
- writer.persist(initialSqn);
+ writer.persist(initialSqn, 1L);
// trigger pre-emptive upload
writer.append(KEY_GROUP, getBytes(100)); // sqn: 1
@@ -246,15 +248,228 @@ class FsStateChangelogWriterTest {
}
}
+ @Test
+ void testLocalFileDiscard() throws Exception {
+ long appendPersistThreshold = 100;
+ TaskChangelogRegistry taskChangelogRegistry =
+ new TaskChangelogRegistryImpl(Executors.directExecutor());
+
+ try (DiscardRecordableStateChangeUploader uploader =
+ new
DiscardRecordableStateChangeUploader(taskChangelogRegistry);
+ TestingBatchingUploadScheduler uploadScheduler =
+ new TestingBatchingUploadScheduler(uploader);
+ FsStateChangelogWriter writer =
+ new FsStateChangelogWriter(
+ UUID.randomUUID(),
+ KeyGroupRange.of(KEY_GROUP, KEY_GROUP),
+ uploadScheduler,
+ appendPersistThreshold,
+ new SyncMailboxExecutor(),
+ taskChangelogRegistry,
+ TestLocalRecoveryConfig.enabledForTest(),
+ new LocalChangelogRegistryImpl(
+
Executors.newDirectExecutorService()))) {
+ SequenceNumber initialSqn = writer.initialSequenceNumber();
+
+ writer.append(KEY_GROUP, getBytes(10));
+
+ long checkpointId = 1L;
+ // checkpoint 1 trigger
+ SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
+ writer.persist(initialSqn, checkpointId);
+ uploadScheduler.scheduleAll(); // checkpoint 1 completed
+ writer.confirm(initialSqn, checkpoint1sqn, checkpointId);
+
+ // trigger pre-emptive upload
+ writer.append(KEY_GROUP, getBytes(100));
+ uploadScheduler.scheduleAll();
+ writer.append(KEY_GROUP, getBytes(10));
+ // checkpoint 2 trigger
+ SequenceNumber checkpoint2sqn = writer.nextSequenceNumber();
+ CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>
future2 =
+ writer.persist(initialSqn, ++checkpointId);
+ uploadScheduler.scheduleAll(); // checkpoint 2 completed
+ writer.confirm(initialSqn, checkpoint2sqn, checkpointId);
+ SnapshotResult<ChangelogStateHandleStreamImpl> result2 =
future2.get();
+ for (Tuple2<StreamStateHandle, Long> handleAndOffset :
+ result2.getTaskLocalSnapshot().getHandlesAndOffsets()) {
+ assertThat(uploader.isDiscarded(handleAndOffset.f0)).isFalse();
+ }
+
+ // materialization 1 trigger
+ SequenceNumber materializationSqn = writer.nextSequenceNumber();
+ writer.append(KEY_GROUP, getBytes(10));
+
+ // materialization 1 completed
+ // checkpoint 3 trigger
+ SequenceNumber checkpoint3sqn = writer.nextSequenceNumber();
+ writer.persist(materializationSqn, ++checkpointId);
+ uploadScheduler.scheduleAll(); // checkpoint 3 completed
+ writer.confirm(materializationSqn, checkpoint3sqn, checkpointId);
+ for (Tuple2<StreamStateHandle, Long> handleAndOffset :
+ result2.getTaskLocalSnapshot().getHandlesAndOffsets()) {
+ assertThat(uploader.isDiscarded(handleAndOffset.f0)).isTrue();
+ }
+ }
+ }
+
+ @Test
+ void testLocalFileAfterMaterialize() throws Exception {
+ // If register local files when confirm(), the following case will
fail:
+ // cp1 trigger: file1,file1'(local)
+ // JM: register [file1] to sharedRegistry
+ // cp1 complete: stopTracking [file1], register [file1'] to
localRegistry
+ // cp2 trigger: file1,file1',file2,file2'
+ // JM: register [file1,file2] to sharedRegistry
+ // cp2 complete: stopTracking [file1,file1',file2,file2'], register
[file1',file2'] to
+ // localRegistry
+ // cp1 subsume
+ // cp3 trigger: file1,file1',file2,file2',file3,file3'
+ // materialization: uploaded.clear()
+ // JM: register [file1,file2,file3] to sharedRegistry
+ // cp3 complete: stopTracking [], register [] to localRegistry
+ // cp2 subsume: [file1', file2'] are discarded
+ // if restore from cp3: local file1',file2' are not found
+ long appendPersistThreshold = 100;
+ TaskChangelogRegistry taskChangelogRegistry =
+ new TaskChangelogRegistryImpl(Executors.directExecutor());
+
+ try (DiscardRecordableStateChangeUploader uploader =
+ new
DiscardRecordableStateChangeUploader(taskChangelogRegistry);
+ TestingBatchingUploadScheduler uploadScheduler =
+ new TestingBatchingUploadScheduler(uploader);
+ FsStateChangelogWriter writer =
+ new FsStateChangelogWriter(
+ UUID.randomUUID(),
+ KeyGroupRange.of(KEY_GROUP, KEY_GROUP),
+ uploadScheduler,
+ appendPersistThreshold,
+ new SyncMailboxExecutor(),
+ taskChangelogRegistry,
+ TestLocalRecoveryConfig.enabledForTest(),
+ new LocalChangelogRegistryImpl(
+
Executors.newDirectExecutorService()))) {
+ SequenceNumber initialSqn = writer.initialSequenceNumber();
+
+ writer.append(KEY_GROUP, getBytes(10));
+
+ long checkpointId = 1L;
+ // checkpoint 1 trigger
+ SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
+ writer.persist(initialSqn, checkpointId);
+ uploadScheduler.scheduleAll(); // checkpoint 1 completed
+ writer.confirm(initialSqn, checkpoint1sqn, checkpointId);
+
+ writer.append(KEY_GROUP, getBytes(10));
+ // checkpoint 2 trigger
+ SequenceNumber checkpoint2sqn = writer.nextSequenceNumber();
+ writer.persist(initialSqn, ++checkpointId);
+ uploadScheduler.scheduleAll(); // checkpoint 2 completed
+ writer.confirm(initialSqn, checkpoint2sqn, checkpointId);
+
+ writer.append(KEY_GROUP, getBytes(10));
+ // checkpoint 3 trigger
+ SequenceNumber checkpoint3sqn = writer.nextSequenceNumber();
+ CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>
future3 =
+ writer.persist(initialSqn, ++checkpointId);
+ uploadScheduler.scheduleAll(); // checkpoint 3 completed
+
+ // materialization 1 trigger
+ SequenceNumber materializationSqn = writer.nextSequenceNumber();
+ writer.truncate(
+ materializationSqn.compareTo(checkpoint1sqn) < 0
+ ? materializationSqn
+ : checkpoint1sqn);
+ // materialization 1 completed
+ // checkpoint 3 confirm
+ writer.confirm(materializationSqn, checkpoint3sqn, checkpointId);
+
+ writer.append(KEY_GROUP, getBytes(10));
+
+ SnapshotResult<ChangelogStateHandleStreamImpl> result3 =
future3.get();
+
+ for (Tuple2<StreamStateHandle, Long> handleAndOffset :
+
result3.getJobManagerOwnedSnapshot().getHandlesAndOffsets()) {
+ assertThat(uploader.isDiscarded(handleAndOffset.f0)).isFalse();
+ }
+
+ for (Tuple2<StreamStateHandle, Long> handleAndOffset :
+ result3.getTaskLocalSnapshot().getHandlesAndOffsets()) {
+ assertThat(uploader.isDiscarded(handleAndOffset.f0)).isFalse();
+ }
+ }
+ }
+
+ @Test
+ void testLocalFileAbort() throws Exception {
+ long appendPersistThreshold = 100;
+ TaskChangelogRegistry taskChangelogRegistry =
+ new TaskChangelogRegistryImpl(Executors.directExecutor());
+
+ try (DiscardRecordableStateChangeUploader uploader =
+ new
DiscardRecordableStateChangeUploader(taskChangelogRegistry);
+ TestingBatchingUploadScheduler uploadScheduler =
+ new TestingBatchingUploadScheduler(uploader);
+ FsStateChangelogWriter writer =
+ new FsStateChangelogWriter(
+ UUID.randomUUID(),
+ KeyGroupRange.of(KEY_GROUP, KEY_GROUP),
+ uploadScheduler,
+ appendPersistThreshold,
+ new SyncMailboxExecutor(),
+ taskChangelogRegistry,
+ TestLocalRecoveryConfig.enabledForTest(),
+ new LocalChangelogRegistryImpl(
+
Executors.newDirectExecutorService()))) {
+ SequenceNumber initialSqn = writer.initialSequenceNumber();
+
+ writer.append(KEY_GROUP, getBytes(10));
+
+ long checkpointId = 1L;
+ // checkpoint 1 trigger
+ SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
+ writer.persist(initialSqn, checkpointId);
+ uploadScheduler.scheduleAll(); // checkpoint 1 completed
+ writer.confirm(initialSqn, checkpoint1sqn, checkpointId);
+
+ writer.append(KEY_GROUP, getBytes(200));
+ uploadScheduler.scheduleAll();
+ writer.append(KEY_GROUP, getBytes(10));
+ // checkpoint 2 trigger
+ SequenceNumber checkpoint2sqn = writer.nextSequenceNumber();
+ CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>
future2 =
+ writer.persist(initialSqn, ++checkpointId);
+ uploadScheduler.scheduleAll();
+
+ // checkpoint 2 abort, all local dstl files are deleted
+ writer.reset(initialSqn, checkpoint2sqn, checkpointId);
+ SnapshotResult<ChangelogStateHandleStreamImpl> result2 =
future2.get();
+ assertThat(result2.getTaskLocalSnapshot().getHandlesAndOffsets())
+ .allMatch(tuple -> uploader.isDiscarded(tuple.f0));
+
+ writer.append(KEY_GROUP, getBytes(10));
+ // checkpoint 3 trigger
+ SequenceNumber checkpoint3sqn = writer.nextSequenceNumber();
+ CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>
future3 =
+ writer.persist(initialSqn, ++checkpointId);
+ uploadScheduler.scheduleAll(); // checkpoint 3 completed
+ SnapshotResult<ChangelogStateHandleStreamImpl> result3 =
future3.get();
+ // checkpoint 3 confirm, delete files of checkpoint 1,2
+ writer.confirm(initialSqn, checkpoint3sqn, checkpointId);
+ assertThat(result3.getTaskLocalSnapshot().getHandlesAndOffsets())
+ .anyMatch(tuple -> !uploader.isDiscarded(tuple.f0));
+ }
+ }
+
@Test
void testNoReUploadBeforeCompletion() throws Exception {
withWriter(
(writer, uploader) -> {
byte[] bytes = getBytes();
SequenceNumber sqn = append(writer, bytes);
- writer.persist(sqn);
+ writer.persist(sqn, 1L);
uploader.reset();
- writer.persist(sqn);
+ writer.persist(sqn, 2L);
assertNoUpload(uploader, "no re-upload should happen");
});
}
@@ -264,11 +479,11 @@ class FsStateChangelogWriterTest {
withWriter(
(writer, uploader) -> {
SequenceNumber sqn = append(writer, getBytes());
- writer.persist(sqn);
+ writer.persist(sqn, 1L);
uploader.reset();
byte[] bytes = getBytes();
sqn = append(writer, bytes);
- writer.persist(sqn);
+ writer.persist(sqn, 2L);
assertSubmittedOnly(uploader, bytes);
});
}
@@ -282,7 +497,7 @@ class FsStateChangelogWriterTest {
SequenceNumber sqn = append(writer, bytes);
writer.reset(sqn, SequenceNumber.of(Long.MAX_VALUE),
Long.MAX_VALUE);
uploader.reset();
- writer.persist(sqn);
+ writer.persist(sqn, 1L);
assertSubmittedOnly(uploader, bytes);
});
}
@@ -298,7 +513,7 @@ class FsStateChangelogWriterTest {
CompletableFuture<
SnapshotResult<
ChangelogStateHandleStreamImpl>>
- future =
writer.persist(sqn);
+ future =
writer.persist(sqn, 1L);
uploader.failUpload(new
RuntimeException("test"));
try {
future.get();
@@ -317,9 +532,9 @@ class FsStateChangelogWriterTest {
(writer, uploader) -> {
byte[] bytes = getBytes();
SequenceNumber sqn =
append(writer, bytes);
- writer.persist(sqn); // future
result ignored
+ writer.persist(sqn, 1L); // future
result ignored
uploader.failUpload(new
RuntimeException("test"));
- writer.persist(sqn); // should
fail right away
+ writer.persist(sqn, 2L); // should
fail right away
}))
.isInstanceOf(IOException.class);
}
@@ -330,12 +545,12 @@ class FsStateChangelogWriterTest {
(writer, uploader) -> {
byte[] bytes = getBytes();
SequenceNumber sqn1 = append(writer, bytes);
- writer.persist(sqn1); // future result ignored
+ writer.persist(sqn1, 1L); // future result ignored
uploader.failUpload(new RuntimeException("test"));
uploader.reset();
SequenceNumber sqn2 = append(writer, bytes);
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
- writer.persist(sqn2);
+ writer.persist(sqn2, 2L);
uploader.completeUpload();
future.get();
});
@@ -349,7 +564,7 @@ class FsStateChangelogWriterTest {
(writer, uploader) -> {
SequenceNumber sqn =
append(writer, getBytes());
writer.truncate(sqn.next());
- writer.persist(sqn);
+ writer.persist(sqn, 1L);
}))
.isInstanceOf(IllegalArgumentException.class);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java
index a2ffb66b84d..678667951f3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java
@@ -21,11 +21,15 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.state.StreamStateHandle;
+import java.io.Closeable;
+import java.io.IOException;
+
/** This registry is responsible for deleting changlog's local handles which
are not in use. */
@Internal
-public interface LocalChangelogRegistry {
+public interface LocalChangelogRegistry extends Closeable {
LocalChangelogRegistry NO_OP =
new LocalChangelogRegistry() {
+
@Override
public void register(StreamStateHandle handle, long
checkpointID) {}
@@ -33,7 +37,7 @@ public interface LocalChangelogRegistry {
public void discardUpToCheckpoint(long upTo) {}
@Override
- public void prune(long checkpointID) {}
+ public void close() throws IOException {}
};
/**
@@ -54,11 +58,4 @@ public interface LocalChangelogRegistry {
* @param upTo lowest CheckpointID which is still valid.
*/
void discardUpToCheckpoint(long upTo);
-
- /**
- * Called upon ChangelogKeyedStateBackend#notifyCheckpointAborted.
- *
- * @param checkpointID to abort
- */
- void prune(long checkpointID);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java
index acb8ae318ca..0d6338cf14c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java
@@ -26,15 +26,14 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
-import java.util.stream.Collectors;
@Internal
public class LocalChangelogRegistryImpl implements LocalChangelogRegistry {
@@ -47,9 +46,9 @@ public class LocalChangelogRegistryImpl implements
LocalChangelogRegistry {
handleToLastUsedCheckpointID = new ConcurrentHashMap<>();
/** Executor for async state deletion. */
- private final Executor asyncDisposalExecutor;
+ private final ExecutorService asyncDisposalExecutor;
- public LocalChangelogRegistryImpl(Executor ioExecutor) {
+ public LocalChangelogRegistryImpl(ExecutorService ioExecutor) {
this.asyncDisposalExecutor = ioExecutor;
}
@@ -84,17 +83,6 @@ public class LocalChangelogRegistryImpl implements
LocalChangelogRegistry {
}
}
- public void prune(long checkpointID) {
- Set<StreamStateHandle> handles =
- handleToLastUsedCheckpointID.values().stream()
- .filter(tuple -> tuple.f1 == checkpointID)
- .map(tuple -> tuple.f0)
- .collect(Collectors.toSet());
- for (StreamStateHandle handle : handles) {
- scheduleAsyncDelete(handle);
- }
- }
-
private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
if (streamStateHandle != null) {
LOG.trace("Scheduled delete of state handle {}.",
streamStateHandle);
@@ -115,4 +103,10 @@ public class LocalChangelogRegistryImpl implements
LocalChangelogRegistry {
}
}
}
+
+ @Override
+ public void close() throws IOException {
+ asyncDisposalExecutor.shutdown();
+ handleToLastUsedCheckpointID.clear();
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
index 337047d6d05..2c633f69e89 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
@@ -50,8 +50,10 @@ public interface StateChangelogWriter<Handle extends
ChangelogStateHandle> exten
* be called for the corresponding change set. with reset/truncate/confirm
methods?
*
* @param from inclusive
+ * @param checkpointId to persist
*/
- CompletableFuture<SnapshotResult<Handle>> persist(SequenceNumber from)
throws IOException;
+ CompletableFuture<SnapshotResult<Handle>> persist(SequenceNumber from,
long checkpointId)
+ throws IOException;
/**
* Truncate this state changelog to free up the resources and collect any
garbage. That means:
@@ -79,13 +81,6 @@ public interface StateChangelogWriter<Handle extends
ChangelogStateHandle> exten
*/
void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);
- /**
- * Mark the state changes of the given checkpoint as subsumed.
- *
- * @param checkpointId
- */
- void subsume(long checkpointId);
-
/**
* Reset the state the given state changes. Called upon abortion so that
if requested later then
* these changes will be re-uploaded.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
index 24d675d9a6c..62cd9f365d4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -95,7 +95,7 @@ class InMemoryStateChangelogWriter implements
StateChangelogWriter<InMemoryChang
@Override
public CompletableFuture<SnapshotResult<InMemoryChangelogStateHandle>>
persist(
- SequenceNumber from) {
+ SequenceNumber from, long checkpointId) {
LOG.debug("Persist after {}", from);
Preconditions.checkNotNull(from);
return completedFuture(
@@ -145,9 +145,6 @@ class InMemoryStateChangelogWriter implements
StateChangelogWriter<InMemoryChang
@Override
public void confirm(SequenceNumber from, SequenceNumber to, long
checkpointID) {}
- @Override
- public void subsume(long checkpointId) {}
-
@Override
public void reset(SequenceNumber from, SequenceNumber to, long
checkpointID) {}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
index 62f3f37bda2..a0782edd10f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
@@ -27,6 +27,10 @@ public class TestLocalRecoveryConfig {
return new LocalRecoveryConfig(null);
}
+ public static LocalRecoveryConfig enabledForTest() {
+ return new LocalRecoveryConfig(new TestDummyLocalDirectoryProvider());
+ }
+
public static class TestDummyLocalDirectoryProvider implements
LocalRecoveryDirectoryProvider {
private TestDummyLocalDirectoryProvider() {}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java
index d28c33e603a..bb04e0c338d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java
@@ -33,7 +33,7 @@ public class LocalChangelogRegistryTest extends TestLogger {
@Test
public void testRegistryNormal() {
LocalChangelogRegistry localStateRegistry =
- new LocalChangelogRegistryImpl(Executors.directExecutor());
+ new
LocalChangelogRegistryImpl(Executors.newDirectExecutorService());
TestingStreamStateHandle handle1 = new TestingStreamStateHandle();
TestingStreamStateHandle handle2 = new TestingStreamStateHandle();
// checkpoint 1: handle1, handle2
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
index 44e8ed8319a..c0f0ebe6a43 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
@@ -100,7 +100,7 @@ public class StateChangelogStorageTest<T extends
ChangelogStateHandle> {
writer.nextSequenceNumber();
}
- SnapshotResult<T> res = writer.persist(prev).get();
+ SnapshotResult<T> res = writer.persist(prev, 1).get();
T jmHandle = res.getJobManagerOwnedSnapshot();
StateChangelogHandleReader<T> reader = client.createReader();
assertByteMapsEqual(appendsByKeyGroup, extract(jmHandle, reader));
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index 77d0f33b61b..dbce58687d5 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -403,7 +403,7 @@ public class ChangelogKeyedStateBackend<K>
return toRunnableFuture(
stateChangelogWriter
- .persist(lastUploadedFrom)
+ .persist(lastUploadedFrom, checkpointId)
.thenApply(
delta ->
buildSnapshotResult(
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java
index 12b5d3388f2..6a6b71868b5 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java
@@ -70,7 +70,6 @@ class ChangelogTruncateHelper {
subsumedUpTo = sqn;
checkpointedUpTo.headMap(checkpointId, true).clear();
truncate();
- stateChangelogWriter.subsume(checkpointId);
}
}
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
index e00b85c1ff8..bd57c8a3680 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
@@ -125,7 +125,8 @@ abstract class StateChangeLoggerTestBase<Namespace> {
}
@Override
- public CompletableFuture<?> persist(SequenceNumber from) throws
IOException {
+ public CompletableFuture<?> persist(SequenceNumber from, long
checkpointId)
+ throws IOException {
throw new UnsupportedOperationException();
}
@@ -135,9 +136,6 @@ abstract class StateChangeLoggerTestBase<Namespace> {
@Override
public void confirm(SequenceNumber from, SequenceNumber to, long
checkpointId) {}
- @Override
- public void subsume(long checkpointId) {}
-
@Override
public void reset(SequenceNumber from, SequenceNumber to, long
checkpointId) {}