This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 543e885 [FLINK-26592][state/changelog] Use mailbox in
FsStateChangelogWriter instead of a lock
543e885 is described below
commit 543e8854b8f6b1e0b6997383de0691d2e9fffba7
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Mar 10 23:16:54 2022 +0100
[FLINK-26592][state/changelog] Use mailbox in FsStateChangelogWriter
instead of a lock
When a task thread tries to schedule an upload, it might wait for available
capacity.
Capacity is released by the uploading thread on upload completion. After
releasing,
it must notify the task thread about the completion.
Both task and uploading thread acquire FsStateChangelogWriter.lock. That
causes
a deadlock if uploader releases capacity insufficient for task thread to
proceed.
This change removes the lock and makes uploader thread to use mailbox
actions.
---
.../changelog/fs/FsStateChangelogStorage.java | 6 +-
.../flink/changelog/fs/FsStateChangelogWriter.java | 146 ++++++++++-----------
.../fs/BatchingStateChangeUploadSchedulerTest.java | 4 +-
.../changelog/fs/ChangelogStorageMetricsTest.java | 19 ++-
.../changelog/fs/FsStateChangelogStorageTest.java | 68 ++++++++++
.../fs/FsStateChangelogWriterSqnTest.java | 4 +-
.../changelog/fs/FsStateChangelogWriterTest.java | 4 +-
.../state/changelog/StateChangelogStorage.java | 4 +-
.../inmemory/InMemoryStateChangelogStorage.java | 3 +-
...kExecutorStateChangelogStoragesManagerTest.java | 3 +-
.../inmemory/StateChangelogStorageLoaderTest.java | 3 +-
.../inmemory/StateChangelogStorageTest.java | 10 +-
.../state/changelog/ChangelogStateBackend.java | 4 +-
.../changelog/ChangelogKeyedStateBackendTest.java | 2 +-
14 files changed, 185 insertions(+), 95 deletions(-)
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
index 9c8bcdc..adeabed 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
@@ -19,6 +19,7 @@ package org.apache.flink.changelog.fs;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.io.AvailabilityProvider;
@@ -91,11 +92,12 @@ public class FsStateChangelogStorage
}
@Override
- public FsStateChangelogWriter createWriter(String operatorID,
KeyGroupRange keyGroupRange) {
+ public FsStateChangelogWriter createWriter(
+ String operatorID, KeyGroupRange keyGroupRange, MailboxExecutor
mailboxExecutor) {
UUID logId = new UUID(0, logIdGenerator.getAndIncrement());
LOG.info("createWriter for operator {}/{}: {}", operatorID,
keyGroupRange, logId);
return new FsStateChangelogWriter(
- logId, keyGroupRange, uploader,
preEmptivePersistThresholdInBytes);
+ logId, keyGroupRange, uploader,
preEmptivePersistThresholdInBytes, mailboxExecutor);
}
@Override
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
index b6537a7..87e6240 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
@@ -18,6 +18,7 @@
package org.apache.flink.changelog.fs;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -32,7 +33,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
@@ -46,7 +46,6 @@ import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.flink.util.IOUtils.closeAllQuietly;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -94,12 +93,7 @@ class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandl
private final StateChangeUploadScheduler uploader;
private final long preEmptivePersistThresholdInBytes;
- /** Lock to synchronize handling of upload completion with new upload
requests. */
- // todo: replace with mailbox executor (after FLINK-23204)
- private final Object lock = new Object();
-
/** A list of listener per upload (~ per checkpoint plus pre-emptive
uploads). */
- @GuardedBy("lock")
private final List<UploadCompletionListener> uploadCompletionListeners =
new ArrayList<>();
/** Current {@link SequenceNumber}. */
@@ -109,7 +103,6 @@ class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandl
* {@link SequenceNumber} before which changes will NOT be requested,
exclusive. Increased after
* materialization.
*/
- @GuardedBy("lock")
private SequenceNumber lowestSequenceNumber = INITIAL_SQN;
/**
@@ -127,28 +120,28 @@ class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandl
private final NavigableMap<SequenceNumber, StateChangeSet> notUploaded =
new TreeMap<>();
/** Uploaded changes, ready for use in snapshots. */
- @GuardedBy("lock")
private final NavigableMap<SequenceNumber, UploadResult> uploaded = new
TreeMap<>();
/**
* Highest {@link SequenceNumber} for which upload has failed (won't be
restarted), inclusive.
*/
- @Nullable
- @GuardedBy("lock")
- private Tuple2<SequenceNumber, Throwable> highestFailed;
+ @Nullable private Tuple2<SequenceNumber, Throwable> highestFailed;
- @GuardedBy("lock")
private boolean closed;
+ private final MailboxExecutor mailboxExecutor;
+
FsStateChangelogWriter(
UUID logId,
KeyGroupRange keyGroupRange,
StateChangeUploadScheduler uploader,
- long preEmptivePersistThresholdInBytes) {
+ long preEmptivePersistThresholdInBytes,
+ MailboxExecutor mailboxExecutor) {
this.logId = logId;
this.keyGroupRange = keyGroupRange;
this.uploader = uploader;
this.preEmptivePersistThresholdInBytes =
preEmptivePersistThresholdInBytes;
+ this.mailboxExecutor = mailboxExecutor;
}
@Override
@@ -194,87 +187,90 @@ class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandl
private CompletableFuture<ChangelogStateHandleStreamImpl>
persistInternal(SequenceNumber from)
throws IOException {
- synchronized (lock) {
- ensureCanPersist(from);
- rollover();
- Map<SequenceNumber, StateChangeSet> toUpload =
drainTailMap(notUploaded, from);
- NavigableMap<SequenceNumber, UploadResult> readyToReturn =
uploaded.tailMap(from, true);
- LOG.debug("collected readyToReturn: {}, toUpload: {}",
readyToReturn, toUpload);
-
- SequenceNumberRange range = SequenceNumberRange.generic(from,
activeSequenceNumber);
- if (range.size() == readyToReturn.size()) {
- checkState(toUpload.isEmpty());
- return completedFuture(buildHandle(keyGroupRange,
readyToReturn, 0L));
- } else {
- CompletableFuture<ChangelogStateHandleStreamImpl> future =
- new CompletableFuture<>();
- uploadCompletionListeners.add(
- new UploadCompletionListener(keyGroupRange, range,
readyToReturn, future));
- if (!toUpload.isEmpty()) {
- uploader.upload(
- new UploadTask(
- toUpload.values(),
- this::handleUploadSuccess,
- this::handleUploadFailure));
- }
- return future;
+ ensureCanPersist(from);
+ rollover();
+ Map<SequenceNumber, StateChangeSet> toUpload =
drainTailMap(notUploaded, from);
+ NavigableMap<SequenceNumber, UploadResult> readyToReturn =
uploaded.tailMap(from, true);
+ LOG.debug("collected readyToReturn: {}, toUpload: {}", readyToReturn,
toUpload);
+
+ SequenceNumberRange range = SequenceNumberRange.generic(from,
activeSequenceNumber);
+ if (range.size() == readyToReturn.size()) {
+ checkState(toUpload.isEmpty());
+ return
CompletableFuture.completedFuture(buildHandle(keyGroupRange, readyToReturn,
0L));
+ } else {
+ CompletableFuture<ChangelogStateHandleStreamImpl> future = new
CompletableFuture<>();
+ uploadCompletionListeners.add(
+ new UploadCompletionListener(keyGroupRange, range,
readyToReturn, future));
+ if (!toUpload.isEmpty()) {
+ UploadTask uploadTask =
+ new UploadTask(
+ toUpload.values(),
+ this::handleUploadSuccess,
+ this::handleUploadFailure);
+ uploader.upload(uploadTask);
}
+ return future;
}
}
private void handleUploadFailure(List<SequenceNumber> failedSqn, Throwable
throwable) {
- synchronized (lock) {
- if (closed) {
- return;
- }
- uploadCompletionListeners.removeIf(
- listener -> listener.onFailure(failedSqn, throwable));
- failedSqn.stream()
- .max(Comparator.naturalOrder())
- .filter(sqn -> sqn.compareTo(lowestSequenceNumber) >= 0)
- .filter(sqn -> highestFailed == null ||
sqn.compareTo(highestFailed.f0) > 0)
- .ifPresent(sqn -> highestFailed = Tuple2.of(sqn,
throwable));
- }
+ mailboxExecutor.execute(
+ () -> {
+ if (closed) {
+ return;
+ }
+ uploadCompletionListeners.removeIf(
+ listener -> listener.onFailure(failedSqn,
throwable));
+ failedSqn.stream()
+ .max(Comparator.naturalOrder())
+ .filter(sqn -> sqn.compareTo(lowestSequenceNumber)
>= 0)
+ .filter(
+ sqn ->
+ highestFailed == null
+ ||
sqn.compareTo(highestFailed.f0) > 0)
+ .ifPresent(sqn -> highestFailed = Tuple2.of(sqn,
throwable));
+ },
+ "handleUploadFailure");
}
private void handleUploadSuccess(List<UploadResult> results) {
- synchronized (lock) {
- if (closed) {
- results.forEach(
- r -> closeAllQuietly(() ->
r.getStreamStateHandle().discardState()));
- } else {
- uploadCompletionListeners.removeIf(listener ->
listener.onSuccess(results));
- for (UploadResult result : results) {
- if (result.sequenceNumber.compareTo(lowestSequenceNumber)
>= 0) {
- uploaded.put(result.sequenceNumber, result);
+ mailboxExecutor.execute(
+ () -> {
+ if (closed) {
+ results.forEach(
+ r ->
+ closeAllQuietly(
+ () ->
r.getStreamStateHandle().discardState()));
+ } else {
+ uploadCompletionListeners.removeIf(listener ->
listener.onSuccess(results));
+ for (UploadResult result : results) {
+ if
(result.sequenceNumber.compareTo(lowestSequenceNumber) >= 0) {
+ uploaded.put(result.sequenceNumber, result);
+ }
+ }
}
- }
- }
- }
+ },
+ "handleUploadSuccess");
}
@Override
public void close() {
LOG.debug("close {}", logId);
- synchronized (lock) {
- checkState(!closed);
- closed = true;
- activeChangeSet.clear();
- activeChangeSetSize = 0;
- notUploaded.clear();
- uploaded.clear();
- }
+ checkState(!closed);
+ closed = true;
+ activeChangeSet.clear();
+ activeChangeSetSize = 0;
+ notUploaded.clear();
+ uploaded.clear();
}
@Override
public void truncate(SequenceNumber to) {
LOG.debug("truncate {} to sqn {} (excl.)", logId, to);
checkArgument(to.compareTo(activeSequenceNumber) <= 0);
- synchronized (lock) {
- lowestSequenceNumber = to;
- notUploaded.headMap(lowestSequenceNumber, false).clear();
- uploaded.headMap(lowestSequenceNumber, false).clear();
- }
+ lowestSequenceNumber = to;
+ notUploaded.headMap(lowestSequenceNumber, false).clear();
+ uploaded.headMap(lowestSequenceNumber, false).clear();
}
private void rollover() {
diff --git
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
index e12f1f2..2b13d91 100644
---
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
+++
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
@@ -429,7 +429,7 @@ public class BatchingStateChangeUploadSchedulerTest {
return Tuple2.of(thread, future);
}
- private static final class BlockingUploader implements StateChangeUploader
{
+ static final class BlockingUploader implements StateChangeUploader {
private final AtomicBoolean blocking = new AtomicBoolean(true);
private final AtomicInteger uploadsCounter = new AtomicInteger();
@@ -449,7 +449,7 @@ public class BatchingStateChangeUploadSchedulerTest {
@Override
public void close() {}
- private void unblock() {
+ void unblock() {
blocking.set(false);
}
diff --git
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
index a4e9e55..3444139 100644
---
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
+++
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path;
import
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
@@ -61,7 +62,7 @@ public class ChangelogStorageMetricsTest {
try (FsStateChangelogStorage storage =
new FsStateChangelogStorage(
Path.fromLocalFile(temporaryFolder.newFolder()),
false, 100, metrics)) {
- FsStateChangelogWriter writer = storage.createWriter("writer",
EMPTY_KEY_GROUP_RANGE);
+ FsStateChangelogWriter writer = createWriter(storage);
int numUploads = 5;
for (int i = 0; i < numUploads; i++) {
@@ -82,7 +83,7 @@ public class ChangelogStorageMetricsTest {
try (FsStateChangelogStorage storage =
new FsStateChangelogStorage(
Path.fromLocalFile(temporaryFolder.newFolder()),
false, 100, metrics)) {
- FsStateChangelogWriter writer = storage.createWriter("writer",
EMPTY_KEY_GROUP_RANGE);
+ FsStateChangelogWriter writer = createWriter(storage);
// upload single byte to infer header size
SequenceNumber from = writer.nextSequenceNumber();
@@ -108,7 +109,7 @@ public class ChangelogStorageMetricsTest {
new
ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup());
try (FsStateChangelogStorage storage =
new FsStateChangelogStorage(Path.fromLocalFile(file), false,
100, metrics)) {
- FsStateChangelogWriter writer = storage.createWriter("writer",
EMPTY_KEY_GROUP_RANGE);
+ FsStateChangelogWriter writer = createWriter(storage);
int numUploads = 5;
for (int i = 0; i < numUploads; i++) {
@@ -149,7 +150,9 @@ public class ChangelogStorageMetricsTest {
FsStateChangelogStorage storage = new FsStateChangelogStorage(batcher,
Integer.MAX_VALUE);
FsStateChangelogWriter[] writers = new
FsStateChangelogWriter[numWriters];
for (int i = 0; i < numWriters; i++) {
- writers[i] = storage.createWriter(Integer.toString(i),
EMPTY_KEY_GROUP_RANGE);
+ writers[i] =
+ storage.createWriter(
+ Integer.toString(i), EMPTY_KEY_GROUP_RANGE, new
SyncMailboxExecutor());
}
try {
@@ -190,7 +193,7 @@ public class ChangelogStorageMetricsTest {
metrics);
FsStateChangelogStorage storage = new FsStateChangelogStorage(batcher,
Integer.MAX_VALUE);
- FsStateChangelogWriter writer = storage.createWriter("writer",
EMPTY_KEY_GROUP_RANGE);
+ FsStateChangelogWriter writer = createWriter(storage);
try {
for (int upload = 0; upload < numUploads; upload++) {
@@ -242,7 +245,7 @@ public class ChangelogStorageMetricsTest {
metrics);
try (FsStateChangelogStorage storage =
new FsStateChangelogStorage(batcher, Long.MAX_VALUE)) {
- FsStateChangelogWriter writer = storage.createWriter("writer",
EMPTY_KEY_GROUP_RANGE);
+ FsStateChangelogWriter writer = createWriter(storage);
int numUploads = 11;
for (int i = 0; i < numUploads; i++) {
SequenceNumber from = writer.nextSequenceNumber();
@@ -288,4 +291,8 @@ public class ChangelogStorageMetricsTest {
attemptsPerTask.clear();
}
}
+
+ private FsStateChangelogWriter createWriter(FsStateChangelogStorage
storage) {
+ return storage.createWriter("writer", EMPTY_KEY_GROUP_RANGE, new
SyncMailboxExecutor());
+ }
}
diff --git
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
index 6179be6..602155d 100644
---
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
+++
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
@@ -17,14 +17,24 @@
package org.apache.flink.changelog.fs;
+import
org.apache.flink.changelog.fs.BatchingStateChangeUploadSchedulerTest.BlockingUploader;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import
org.apache.flink.runtime.state.changelog.inmemory.StateChangelogStorageTest;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
+import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
+import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import static
org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup;
@@ -46,4 +56,62 @@ public class FsStateChangelogStorageTest extends
StateChangelogStorageTest {
1024 * 1024 * 10,
createUnregisteredChangelogStorageMetricGroup());
}
+
+ /**
+ * Provoke a deadlock between task and uploader threads which might happen
during waiting for
+ * capacity and upload completion.
+ */
+ @Test
+ public void testDeadlockOnUploadCompletion() throws Throwable {
+ int capacity = 10; // in bytes, allow the first two uploads without
waiting (see below)
+ CountDownLatch remainingUploads = new CountDownLatch(3);
+ BlockingUploader blockingUploader = new BlockingUploader();
+ CompletableFuture<Void> unblockFuture = new CompletableFuture<>();
+ new Thread(
+ () -> {
+ try {
+ remainingUploads.await();
+ blockingUploader.unblock();
+ unblockFuture.complete(null);
+ } catch (Throwable e) {
+ unblockFuture.completeExceptionally(e);
+ }
+ })
+ .start();
+ MailboxExecutorImpl mailboxExecutor =
+ new MailboxExecutorImpl(
+ new TaskMailboxImpl(), 0,
StreamTaskActionExecutor.IMMEDIATE);
+ try (BatchingStateChangeUploadScheduler scheduler =
+ new BatchingStateChangeUploadScheduler(
+ 0, // schedule immediately
+ 0, // schedule immediately
+ RetryPolicy.NONE,
+ blockingUploader,
+ 1,
+ capacity,
+
createUnregisteredChangelogStorageMetricGroup()) {
+ @Override
+ public void upload(UploadTask uploadTask) throws
IOException {
+ remainingUploads.countDown();
+ super.upload(uploadTask);
+ }
+ };
+ StateChangelogWriter<?> writer =
+ new FsStateChangelogStorage(scheduler, 0 /* persist
immediately */)
+ .createWriter(
+ new OperatorID().toString(),
+ KeyGroupRange.of(0, 0),
+ mailboxExecutor); ) {
+ // 1. start with 1-byte request - releasing only it will NOT allow
proceeding in 3, but
+ // still involves completion callback, which can deadlock
+ writer.append(0, new byte[1]);
+ // 2. exceed capacity
+ writer.append(0, new byte[capacity]);
+ // 3. current thread will block until both previous requests are
completed
+ // verify that completion can proceed while this thread is waiting
+ writer.append(0, new byte[1]);
+ }
+ // check unblocking thread exit status
+ unblockFuture.join();
+ }
}
diff --git
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
index 4d4a37d..f3e576d 100644
---
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
+++
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.changelog.fs;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
@@ -86,7 +87,8 @@ public class FsStateChangelogWriterSqnTest {
KeyGroupRange.of(0, 0),
StateChangeUploadScheduler.directScheduler(
new TestingStateChangeUploader()),
- Long.MAX_VALUE)) {
+ Long.MAX_VALUE,
+ new SyncMailboxExecutor())) {
if (test.withAppend) {
append(writer);
}
diff --git
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
index 64addf7..b0a49dd 100644
---
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
+++
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.changelog.fs;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
@@ -211,7 +212,8 @@ public class FsStateChangelogWriterTest {
UUID.randomUUID(),
KeyGroupRange.of(KEY_GROUP, KEY_GROUP),
StateChangeUploadScheduler.directScheduler(uploader),
- appendPersistThreshold)) {
+ appendPersistThreshold,
+ new SyncMailboxExecutor())) {
test.accept(writer, uploader);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java
index 0bc26e3..14794b4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.changelog;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -30,7 +31,8 @@ import org.apache.flink.runtime.state.KeyGroupRange;
@Internal
public interface StateChangelogStorage<Handle extends ChangelogStateHandle>
extends AutoCloseable {
- StateChangelogWriter<Handle> createWriter(String operatorID, KeyGroupRange
keyGroupRange);
+ StateChangelogWriter<Handle> createWriter(
+ String operatorID, KeyGroupRange keyGroupRange, MailboxExecutor
mailboxExecutor);
StateChangelogHandleReader<Handle> createReader();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
index c9bfdfb..6f49407 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
@@ -17,6 +17,7 @@
package org.apache.flink.runtime.state.changelog.inmemory;
+import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
@@ -28,7 +29,7 @@ public class InMemoryStateChangelogStorage
@Override
public InMemoryStateChangelogWriter createWriter(
- String operatorID, KeyGroupRange keyGroupRange) {
+ String operatorID, KeyGroupRange keyGroupRange, MailboxExecutor
mailboxExecutor) {
return new InMemoryStateChangelogWriter(keyGroupRange);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
index d3f1767..db3502a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.plugin.PluginManager;
@@ -166,7 +167,7 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
@Override
public StateChangelogWriter<ChangelogStateHandle> createWriter(
- String operatorID, KeyGroupRange keyGroupRange) {
+ String operatorID, KeyGroupRange keyGroupRange,
MailboxExecutor mailboxExecutor) {
return null;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
index 6600ff4..8fdfc26 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.runtime.state.changelog.inmemory;
+import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.plugin.PluginManager;
@@ -92,7 +93,7 @@ public class StateChangelogStorageLoaderTest {
implements StateChangelogStorage<ChangelogStateHandle> {
@Override
public StateChangelogWriter<ChangelogStateHandle> createWriter(
- String operatorID, KeyGroupRange keyGroupRange) {
+ String operatorID, KeyGroupRange keyGroupRange,
MailboxExecutor mailboxExecutor) {
return null;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
index d5e3303..d7ddf40 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.changelog.inmemory;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
@@ -58,7 +59,11 @@ public class StateChangelogStorageTest<T extends
ChangelogStateHandle> {
@Test(expected = IllegalStateException.class)
public void testNoAppendAfterClose() throws IOException {
StateChangelogWriter<?> writer =
- getFactory().createWriter(new OperatorID().toString(),
KeyGroupRange.of(0, 0));
+ getFactory()
+ .createWriter(
+ new OperatorID().toString(),
+ KeyGroupRange.of(0, 0),
+ new SyncMailboxExecutor());
writer.close();
writer.append(0, new byte[0]);
}
@@ -70,7 +75,8 @@ public class StateChangelogStorageTest<T extends
ChangelogStateHandle> {
try (StateChangelogStorage<T> client = getFactory();
StateChangelogWriter<T> writer =
- client.createWriter(new OperatorID().toString(),
kgRange)) {
+ client.createWriter(
+ new OperatorID().toString(), kgRange, new
SyncMailboxExecutor())) {
SequenceNumber prev = writer.initialSequenceNumber();
for (Map.Entry<Integer, List<byte[]>> entry :
appendsByKeyGroup.entrySet()) {
Integer group = entry.getKey();
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
index 863a533..afcd17a 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
@@ -234,7 +234,9 @@ public class ChangelogStateBackend implements
DelegatingStateBackend, Configurab
executionConfig,
ttlTimeProvider,
changelogStorage.createWriter(
- operatorIdentifier,
keyGroupRange),
+ operatorIdentifier,
+ keyGroupRange,
+ env.getMainMailboxExecutor()),
baseState,
env.getCheckpointStorageAccess()));
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
index 4f9c910..264fb6a 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
@@ -109,7 +109,7 @@ public class ChangelogKeyedStateBackendTest {
new ExecutionConfig(),
TtlTimeProvider.DEFAULT,
new InMemoryStateChangelogStorage()
- .createWriter("test",
KeyGroupRange.EMPTY_KEY_GROUP_RANGE),
+ .createWriter("test",
KeyGroupRange.EMPTY_KEY_GROUP_RANGE, null),
emptyList(),
new DummyCheckpointingStorageAccess());
}