This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 543e885  [FLINK-26592][state/changelog] Use mailbox in 
FsStateChangelogWriter instead of a lock
543e885 is described below

commit 543e8854b8f6b1e0b6997383de0691d2e9fffba7
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Mar 10 23:16:54 2022 +0100

    [FLINK-26592][state/changelog] Use mailbox in FsStateChangelogWriter 
instead of a lock
    
    When a task thread tries to schedule an upload, it might wait for available 
capacity.
    Capacity is released by the uploading thread on upload completion.  After 
releasing,
    it must notify the task thread about the completion.
    Both task and uploading thread acquire FsStateChangelogWriter.lock. That 
causes
    a deadlock if uploader releases capacity insufficient for task thread to 
proceed.
    
    This change removes the lock and makes uploader thread to use mailbox 
actions.
---
 .../changelog/fs/FsStateChangelogStorage.java      |   6 +-
 .../flink/changelog/fs/FsStateChangelogWriter.java | 146 ++++++++++-----------
 .../fs/BatchingStateChangeUploadSchedulerTest.java |   4 +-
 .../changelog/fs/ChangelogStorageMetricsTest.java  |  19 ++-
 .../changelog/fs/FsStateChangelogStorageTest.java  |  68 ++++++++++
 .../fs/FsStateChangelogWriterSqnTest.java          |   4 +-
 .../changelog/fs/FsStateChangelogWriterTest.java   |   4 +-
 .../state/changelog/StateChangelogStorage.java     |   4 +-
 .../inmemory/InMemoryStateChangelogStorage.java    |   3 +-
 ...kExecutorStateChangelogStoragesManagerTest.java |   3 +-
 .../inmemory/StateChangelogStorageLoaderTest.java  |   3 +-
 .../inmemory/StateChangelogStorageTest.java        |  10 +-
 .../state/changelog/ChangelogStateBackend.java     |   4 +-
 .../changelog/ChangelogKeyedStateBackendTest.java  |   2 +-
 14 files changed, 185 insertions(+), 95 deletions(-)

diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
index 9c8bcdc..adeabed 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
@@ -19,6 +19,7 @@ package org.apache.flink.changelog.fs;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.io.AvailabilityProvider;
@@ -91,11 +92,12 @@ public class FsStateChangelogStorage
     }
 
     @Override
-    public FsStateChangelogWriter createWriter(String operatorID, 
KeyGroupRange keyGroupRange) {
+    public FsStateChangelogWriter createWriter(
+            String operatorID, KeyGroupRange keyGroupRange, MailboxExecutor 
mailboxExecutor) {
         UUID logId = new UUID(0, logIdGenerator.getAndIncrement());
         LOG.info("createWriter for operator {}/{}: {}", operatorID, 
keyGroupRange, logId);
         return new FsStateChangelogWriter(
-                logId, keyGroupRange, uploader, 
preEmptivePersistThresholdInBytes);
+                logId, keyGroupRange, uploader, 
preEmptivePersistThresholdInBytes, mailboxExecutor);
     }
 
     @Override
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
index b6537a7..87e6240 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
@@ -18,6 +18,7 @@
 package org.apache.flink.changelog.fs;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -32,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.IOException;
@@ -46,7 +46,6 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.flink.util.IOUtils.closeAllQuietly;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -94,12 +93,7 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
     private final StateChangeUploadScheduler uploader;
     private final long preEmptivePersistThresholdInBytes;
 
-    /** Lock to synchronize handling of upload completion with new upload 
requests. */
-    // todo: replace with mailbox executor (after FLINK-23204)
-    private final Object lock = new Object();
-
     /** A list of listener per upload (~ per checkpoint plus pre-emptive 
uploads). */
-    @GuardedBy("lock")
     private final List<UploadCompletionListener> uploadCompletionListeners = 
new ArrayList<>();
 
     /** Current {@link SequenceNumber}. */
@@ -109,7 +103,6 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
      * {@link SequenceNumber} before which changes will NOT be requested, 
exclusive. Increased after
      * materialization.
      */
-    @GuardedBy("lock")
     private SequenceNumber lowestSequenceNumber = INITIAL_SQN;
 
     /**
@@ -127,28 +120,28 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
     private final NavigableMap<SequenceNumber, StateChangeSet> notUploaded = 
new TreeMap<>();
 
     /** Uploaded changes, ready for use in snapshots. */
-    @GuardedBy("lock")
     private final NavigableMap<SequenceNumber, UploadResult> uploaded = new 
TreeMap<>();
 
     /**
      * Highest {@link SequenceNumber} for which upload has failed (won't be 
restarted), inclusive.
      */
-    @Nullable
-    @GuardedBy("lock")
-    private Tuple2<SequenceNumber, Throwable> highestFailed;
+    @Nullable private Tuple2<SequenceNumber, Throwable> highestFailed;
 
-    @GuardedBy("lock")
     private boolean closed;
 
+    private final MailboxExecutor mailboxExecutor;
+
     FsStateChangelogWriter(
             UUID logId,
             KeyGroupRange keyGroupRange,
             StateChangeUploadScheduler uploader,
-            long preEmptivePersistThresholdInBytes) {
+            long preEmptivePersistThresholdInBytes,
+            MailboxExecutor mailboxExecutor) {
         this.logId = logId;
         this.keyGroupRange = keyGroupRange;
         this.uploader = uploader;
         this.preEmptivePersistThresholdInBytes = 
preEmptivePersistThresholdInBytes;
+        this.mailboxExecutor = mailboxExecutor;
     }
 
     @Override
@@ -194,87 +187,90 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
 
     private CompletableFuture<ChangelogStateHandleStreamImpl> 
persistInternal(SequenceNumber from)
             throws IOException {
-        synchronized (lock) {
-            ensureCanPersist(from);
-            rollover();
-            Map<SequenceNumber, StateChangeSet> toUpload = 
drainTailMap(notUploaded, from);
-            NavigableMap<SequenceNumber, UploadResult> readyToReturn = 
uploaded.tailMap(from, true);
-            LOG.debug("collected readyToReturn: {}, toUpload: {}", 
readyToReturn, toUpload);
-
-            SequenceNumberRange range = SequenceNumberRange.generic(from, 
activeSequenceNumber);
-            if (range.size() == readyToReturn.size()) {
-                checkState(toUpload.isEmpty());
-                return completedFuture(buildHandle(keyGroupRange, 
readyToReturn, 0L));
-            } else {
-                CompletableFuture<ChangelogStateHandleStreamImpl> future =
-                        new CompletableFuture<>();
-                uploadCompletionListeners.add(
-                        new UploadCompletionListener(keyGroupRange, range, 
readyToReturn, future));
-                if (!toUpload.isEmpty()) {
-                    uploader.upload(
-                            new UploadTask(
-                                    toUpload.values(),
-                                    this::handleUploadSuccess,
-                                    this::handleUploadFailure));
-                }
-                return future;
+        ensureCanPersist(from);
+        rollover();
+        Map<SequenceNumber, StateChangeSet> toUpload = 
drainTailMap(notUploaded, from);
+        NavigableMap<SequenceNumber, UploadResult> readyToReturn = 
uploaded.tailMap(from, true);
+        LOG.debug("collected readyToReturn: {}, toUpload: {}", readyToReturn, 
toUpload);
+
+        SequenceNumberRange range = SequenceNumberRange.generic(from, 
activeSequenceNumber);
+        if (range.size() == readyToReturn.size()) {
+            checkState(toUpload.isEmpty());
+            return 
CompletableFuture.completedFuture(buildHandle(keyGroupRange, readyToReturn, 
0L));
+        } else {
+            CompletableFuture<ChangelogStateHandleStreamImpl> future = new 
CompletableFuture<>();
+            uploadCompletionListeners.add(
+                    new UploadCompletionListener(keyGroupRange, range, 
readyToReturn, future));
+            if (!toUpload.isEmpty()) {
+                UploadTask uploadTask =
+                        new UploadTask(
+                                toUpload.values(),
+                                this::handleUploadSuccess,
+                                this::handleUploadFailure);
+                uploader.upload(uploadTask);
             }
+            return future;
         }
     }
 
     private void handleUploadFailure(List<SequenceNumber> failedSqn, Throwable 
throwable) {
-        synchronized (lock) {
-            if (closed) {
-                return;
-            }
-            uploadCompletionListeners.removeIf(
-                    listener -> listener.onFailure(failedSqn, throwable));
-            failedSqn.stream()
-                    .max(Comparator.naturalOrder())
-                    .filter(sqn -> sqn.compareTo(lowestSequenceNumber) >= 0)
-                    .filter(sqn -> highestFailed == null || 
sqn.compareTo(highestFailed.f0) > 0)
-                    .ifPresent(sqn -> highestFailed = Tuple2.of(sqn, 
throwable));
-        }
+        mailboxExecutor.execute(
+                () -> {
+                    if (closed) {
+                        return;
+                    }
+                    uploadCompletionListeners.removeIf(
+                            listener -> listener.onFailure(failedSqn, 
throwable));
+                    failedSqn.stream()
+                            .max(Comparator.naturalOrder())
+                            .filter(sqn -> sqn.compareTo(lowestSequenceNumber) 
>= 0)
+                            .filter(
+                                    sqn ->
+                                            highestFailed == null
+                                                    || 
sqn.compareTo(highestFailed.f0) > 0)
+                            .ifPresent(sqn -> highestFailed = Tuple2.of(sqn, 
throwable));
+                },
+                "handleUploadFailure");
     }
 
     private void handleUploadSuccess(List<UploadResult> results) {
-        synchronized (lock) {
-            if (closed) {
-                results.forEach(
-                        r -> closeAllQuietly(() -> 
r.getStreamStateHandle().discardState()));
-            } else {
-                uploadCompletionListeners.removeIf(listener -> 
listener.onSuccess(results));
-                for (UploadResult result : results) {
-                    if (result.sequenceNumber.compareTo(lowestSequenceNumber) 
>= 0) {
-                        uploaded.put(result.sequenceNumber, result);
+        mailboxExecutor.execute(
+                () -> {
+                    if (closed) {
+                        results.forEach(
+                                r ->
+                                        closeAllQuietly(
+                                                () -> 
r.getStreamStateHandle().discardState()));
+                    } else {
+                        uploadCompletionListeners.removeIf(listener -> 
listener.onSuccess(results));
+                        for (UploadResult result : results) {
+                            if 
(result.sequenceNumber.compareTo(lowestSequenceNumber) >= 0) {
+                                uploaded.put(result.sequenceNumber, result);
+                            }
+                        }
                     }
-                }
-            }
-        }
+                },
+                "handleUploadSuccess");
     }
 
     @Override
     public void close() {
         LOG.debug("close {}", logId);
-        synchronized (lock) {
-            checkState(!closed);
-            closed = true;
-            activeChangeSet.clear();
-            activeChangeSetSize = 0;
-            notUploaded.clear();
-            uploaded.clear();
-        }
+        checkState(!closed);
+        closed = true;
+        activeChangeSet.clear();
+        activeChangeSetSize = 0;
+        notUploaded.clear();
+        uploaded.clear();
     }
 
     @Override
     public void truncate(SequenceNumber to) {
         LOG.debug("truncate {} to sqn {} (excl.)", logId, to);
         checkArgument(to.compareTo(activeSequenceNumber) <= 0);
-        synchronized (lock) {
-            lowestSequenceNumber = to;
-            notUploaded.headMap(lowestSequenceNumber, false).clear();
-            uploaded.headMap(lowestSequenceNumber, false).clear();
-        }
+        lowestSequenceNumber = to;
+        notUploaded.headMap(lowestSequenceNumber, false).clear();
+        uploaded.headMap(lowestSequenceNumber, false).clear();
     }
 
     private void rollover() {
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
index e12f1f2..2b13d91 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
@@ -429,7 +429,7 @@ public class BatchingStateChangeUploadSchedulerTest {
         return Tuple2.of(thread, future);
     }
 
-    private static final class BlockingUploader implements StateChangeUploader 
{
+    static final class BlockingUploader implements StateChangeUploader {
         private final AtomicBoolean blocking = new AtomicBoolean(true);
         private final AtomicInteger uploadsCounter = new AtomicInteger();
 
@@ -449,7 +449,7 @@ public class BatchingStateChangeUploadSchedulerTest {
         @Override
         public void close() {}
 
-        private void unblock() {
+        void unblock() {
             blocking.set(false);
         }
 
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
index a4e9e55..3444139 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
@@ -61,7 +62,7 @@ public class ChangelogStorageMetricsTest {
         try (FsStateChangelogStorage storage =
                 new FsStateChangelogStorage(
                         Path.fromLocalFile(temporaryFolder.newFolder()), 
false, 100, metrics)) {
-            FsStateChangelogWriter writer = storage.createWriter("writer", 
EMPTY_KEY_GROUP_RANGE);
+            FsStateChangelogWriter writer = createWriter(storage);
 
             int numUploads = 5;
             for (int i = 0; i < numUploads; i++) {
@@ -82,7 +83,7 @@ public class ChangelogStorageMetricsTest {
         try (FsStateChangelogStorage storage =
                 new FsStateChangelogStorage(
                         Path.fromLocalFile(temporaryFolder.newFolder()), 
false, 100, metrics)) {
-            FsStateChangelogWriter writer = storage.createWriter("writer", 
EMPTY_KEY_GROUP_RANGE);
+            FsStateChangelogWriter writer = createWriter(storage);
 
             // upload single byte to infer header size
             SequenceNumber from = writer.nextSequenceNumber();
@@ -108,7 +109,7 @@ public class ChangelogStorageMetricsTest {
                 new 
ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup());
         try (FsStateChangelogStorage storage =
                 new FsStateChangelogStorage(Path.fromLocalFile(file), false, 
100, metrics)) {
-            FsStateChangelogWriter writer = storage.createWriter("writer", 
EMPTY_KEY_GROUP_RANGE);
+            FsStateChangelogWriter writer = createWriter(storage);
 
             int numUploads = 5;
             for (int i = 0; i < numUploads; i++) {
@@ -149,7 +150,9 @@ public class ChangelogStorageMetricsTest {
         FsStateChangelogStorage storage = new FsStateChangelogStorage(batcher, 
Integer.MAX_VALUE);
         FsStateChangelogWriter[] writers = new 
FsStateChangelogWriter[numWriters];
         for (int i = 0; i < numWriters; i++) {
-            writers[i] = storage.createWriter(Integer.toString(i), 
EMPTY_KEY_GROUP_RANGE);
+            writers[i] =
+                    storage.createWriter(
+                            Integer.toString(i), EMPTY_KEY_GROUP_RANGE, new 
SyncMailboxExecutor());
         }
 
         try {
@@ -190,7 +193,7 @@ public class ChangelogStorageMetricsTest {
                         metrics);
 
         FsStateChangelogStorage storage = new FsStateChangelogStorage(batcher, 
Integer.MAX_VALUE);
-        FsStateChangelogWriter writer = storage.createWriter("writer", 
EMPTY_KEY_GROUP_RANGE);
+        FsStateChangelogWriter writer = createWriter(storage);
 
         try {
             for (int upload = 0; upload < numUploads; upload++) {
@@ -242,7 +245,7 @@ public class ChangelogStorageMetricsTest {
                         metrics);
         try (FsStateChangelogStorage storage =
                 new FsStateChangelogStorage(batcher, Long.MAX_VALUE)) {
-            FsStateChangelogWriter writer = storage.createWriter("writer", 
EMPTY_KEY_GROUP_RANGE);
+            FsStateChangelogWriter writer = createWriter(storage);
             int numUploads = 11;
             for (int i = 0; i < numUploads; i++) {
                 SequenceNumber from = writer.nextSequenceNumber();
@@ -288,4 +291,8 @@ public class ChangelogStorageMetricsTest {
             attemptsPerTask.clear();
         }
     }
+
+    private FsStateChangelogWriter createWriter(FsStateChangelogStorage 
storage) {
+        return storage.createWriter("writer", EMPTY_KEY_GROUP_RANGE, new 
SyncMailboxExecutor());
+    }
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
index 6179be6..602155d 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
@@ -17,14 +17,24 @@
 
 package org.apache.flink.changelog.fs;
 
+import 
org.apache.flink.changelog.fs.BatchingStateChangeUploadSchedulerTest.BlockingUploader;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import 
org.apache.flink.runtime.state.changelog.inmemory.StateChangelogStorageTest;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
+import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
 
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 
 import static 
org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup;
 
@@ -46,4 +56,62 @@ public class FsStateChangelogStorageTest extends 
StateChangelogStorageTest {
                 1024 * 1024 * 10,
                 createUnregisteredChangelogStorageMetricGroup());
     }
+
+    /**
+     * Provoke a deadlock between task and uploader threads which might happen 
during waiting for
+     * capacity and upload completion.
+     */
+    @Test
+    public void testDeadlockOnUploadCompletion() throws Throwable {
+        int capacity = 10; // in bytes, allow the first two uploads without 
waiting (see below)
+        CountDownLatch remainingUploads = new CountDownLatch(3);
+        BlockingUploader blockingUploader = new BlockingUploader();
+        CompletableFuture<Void> unblockFuture = new CompletableFuture<>();
+        new Thread(
+                        () -> {
+                            try {
+                                remainingUploads.await();
+                                blockingUploader.unblock();
+                                unblockFuture.complete(null);
+                            } catch (Throwable e) {
+                                unblockFuture.completeExceptionally(e);
+                            }
+                        })
+                .start();
+        MailboxExecutorImpl mailboxExecutor =
+                new MailboxExecutorImpl(
+                        new TaskMailboxImpl(), 0, 
StreamTaskActionExecutor.IMMEDIATE);
+        try (BatchingStateChangeUploadScheduler scheduler =
+                        new BatchingStateChangeUploadScheduler(
+                                0, // schedule immediately
+                                0, // schedule immediately
+                                RetryPolicy.NONE,
+                                blockingUploader,
+                                1,
+                                capacity,
+                                
createUnregisteredChangelogStorageMetricGroup()) {
+                            @Override
+                            public void upload(UploadTask uploadTask) throws 
IOException {
+                                remainingUploads.countDown();
+                                super.upload(uploadTask);
+                            }
+                        };
+                StateChangelogWriter<?> writer =
+                        new FsStateChangelogStorage(scheduler, 0 /* persist 
immediately */)
+                                .createWriter(
+                                        new OperatorID().toString(),
+                                        KeyGroupRange.of(0, 0),
+                                        mailboxExecutor); ) {
+            // 1. start with 1-byte request - releasing only it will NOT allow 
proceeding in 3, but
+            // still involves completion callback, which can deadlock
+            writer.append(0, new byte[1]);
+            // 2. exceed capacity
+            writer.append(0, new byte[capacity]);
+            // 3. current thread will block until both previous requests are 
completed
+            // verify that completion can proceed while this thread is waiting
+            writer.append(0, new byte[1]);
+        }
+        // check unblocking thread exit status
+        unblockFuture.join();
+    }
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
index 4d4a37d..f3e576d 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.changelog.fs;
 
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
@@ -86,7 +87,8 @@ public class FsStateChangelogWriterSqnTest {
                         KeyGroupRange.of(0, 0),
                         StateChangeUploadScheduler.directScheduler(
                                 new TestingStateChangeUploader()),
-                        Long.MAX_VALUE)) {
+                        Long.MAX_VALUE,
+                        new SyncMailboxExecutor())) {
             if (test.withAppend) {
                 append(writer);
             }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
index 64addf7..b0a49dd 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.changelog.fs;
 
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
@@ -211,7 +212,8 @@ public class FsStateChangelogWriterTest {
                         UUID.randomUUID(),
                         KeyGroupRange.of(KEY_GROUP, KEY_GROUP),
                         StateChangeUploadScheduler.directScheduler(uploader),
-                        appendPersistThreshold)) {
+                        appendPersistThreshold,
+                        new SyncMailboxExecutor())) {
             test.accept(writer, uploader);
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java
index 0bc26e3..14794b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.changelog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.runtime.io.AvailabilityProvider;
 import org.apache.flink.runtime.state.KeyGroupRange;
 
@@ -30,7 +31,8 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 @Internal
 public interface StateChangelogStorage<Handle extends ChangelogStateHandle> 
extends AutoCloseable {
 
-    StateChangelogWriter<Handle> createWriter(String operatorID, KeyGroupRange 
keyGroupRange);
+    StateChangelogWriter<Handle> createWriter(
+            String operatorID, KeyGroupRange keyGroupRange, MailboxExecutor 
mailboxExecutor);
 
     StateChangelogHandleReader<Handle> createReader();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
index c9bfdfb..6f49407 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.state.changelog.inmemory;
 
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
@@ -28,7 +29,7 @@ public class InMemoryStateChangelogStorage
 
     @Override
     public InMemoryStateChangelogWriter createWriter(
-            String operatorID, KeyGroupRange keyGroupRange) {
+            String operatorID, KeyGroupRange keyGroupRange, MailboxExecutor 
mailboxExecutor) {
         return new InMemoryStateChangelogWriter(keyGroupRange);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
index d3f1767..db3502a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.StateChangelogOptions;
 import org.apache.flink.core.plugin.PluginManager;
@@ -166,7 +167,7 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
 
         @Override
         public StateChangelogWriter<ChangelogStateHandle> createWriter(
-                String operatorID, KeyGroupRange keyGroupRange) {
+                String operatorID, KeyGroupRange keyGroupRange, 
MailboxExecutor mailboxExecutor) {
             return null;
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
index 6600ff4..8fdfc26 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.state.changelog.inmemory;
 
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.StateChangelogOptions;
 import org.apache.flink.core.plugin.PluginManager;
@@ -92,7 +93,7 @@ public class StateChangelogStorageLoaderTest {
             implements StateChangelogStorage<ChangelogStateHandle> {
         @Override
         public StateChangelogWriter<ChangelogStateHandle> createWriter(
-                String operatorID, KeyGroupRange keyGroupRange) {
+                String operatorID, KeyGroupRange keyGroupRange, 
MailboxExecutor mailboxExecutor) {
             return null;
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
index d5e3303..d7ddf40 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.state.changelog.inmemory;
 
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
@@ -58,7 +59,11 @@ public class StateChangelogStorageTest<T extends 
ChangelogStateHandle> {
     @Test(expected = IllegalStateException.class)
     public void testNoAppendAfterClose() throws IOException {
         StateChangelogWriter<?> writer =
-                getFactory().createWriter(new OperatorID().toString(), 
KeyGroupRange.of(0, 0));
+                getFactory()
+                        .createWriter(
+                                new OperatorID().toString(),
+                                KeyGroupRange.of(0, 0),
+                                new SyncMailboxExecutor());
         writer.close();
         writer.append(0, new byte[0]);
     }
@@ -70,7 +75,8 @@ public class StateChangelogStorageTest<T extends 
ChangelogStateHandle> {
 
         try (StateChangelogStorage<T> client = getFactory();
                 StateChangelogWriter<T> writer =
-                        client.createWriter(new OperatorID().toString(), 
kgRange)) {
+                        client.createWriter(
+                                new OperatorID().toString(), kgRange, new 
SyncMailboxExecutor())) {
             SequenceNumber prev = writer.initialSequenceNumber();
             for (Map.Entry<Integer, List<byte[]>> entry : 
appendsByKeyGroup.entrySet()) {
                 Integer group = entry.getKey();
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
index 863a533..afcd17a 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
@@ -234,7 +234,9 @@ public class ChangelogStateBackend implements 
DelegatingStateBackend, Configurab
                                         executionConfig,
                                         ttlTimeProvider,
                                         changelogStorage.createWriter(
-                                                operatorIdentifier, 
keyGroupRange),
+                                                operatorIdentifier,
+                                                keyGroupRange,
+                                                env.getMainMailboxExecutor()),
                                         baseState,
                                         env.getCheckpointStorageAccess()));
 
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
index 4f9c910..264fb6a 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
@@ -109,7 +109,7 @@ public class ChangelogKeyedStateBackendTest {
                 new ExecutionConfig(),
                 TtlTimeProvider.DEFAULT,
                 new InMemoryStateChangelogStorage()
-                        .createWriter("test", 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE),
+                        .createWriter("test", 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE, null),
                 emptyList(),
                 new DummyCheckpointingStorageAccess());
     }

Reply via email to