This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b442394c65b [FLINK-28785][network] Hybrid shuffle consumer thread and 
upstream thread may have deadlock.
b442394c65b is described below

commit b442394c65b1e924c4b5710a9453e3c7eacf05b5
Author: Weijie Guo <[email protected]>
AuthorDate: Wed Aug 3 14:24:21 2022 +0800

    [FLINK-28785][network] Hybrid shuffle consumer thread and upstream thread 
may have deadlock.
    
    In hybrid shuffle mode, subpartition view lock will be acquired by consumer 
thread, and further wait the read lock of MemoryDataManager. But 
MemoryDataManager may acquire write lock to make a global spilling decision, 
and then wait subpartition view lock to get consuming offset. In this case, 
deadlock will occurs.
    
    consumer thread : acqurie subpartition lock -> wait read lock.
    
    upstream thread  : acquire write lock -> wait subpartition lock.
    
    This closes #20456
---
 .../partition/hybrid/HsMemoryDataManager.java      |  6 +-
 .../hybrid/HsSubpartitionFileReaderImpl.java       |  4 +-
 .../partition/hybrid/HsSubpartitionView.java       |  6 +-
 .../HsSubpartitionViewInternalOperations.java      | 11 +++-
 .../hybrid/HsSubpartitionFileReaderImplTest.java   |  2 +-
 .../partition/hybrid/HsSubpartitionViewTest.java   | 66 +++++++++++++++++++++-
 .../TestingSubpartitionViewInternalOperation.java  |  2 +-
 7 files changed, 87 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index b887a157cfa..d7f0e2b346d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -190,7 +190,11 @@ public class HsMemoryDataManager implements 
HsSpillingInfoProvider, HsMemoryData
         for (int channel = 0; channel < numSubpartitions; channel++) {
             HsSubpartitionViewInternalOperations viewOperation =
                     subpartitionViewOperationsMap.get(channel);
-            consumeIndexes.add(viewOperation == null ? -1 : 
viewOperation.getConsumingOffset() + 1);
+            // Access consuming offset without lock to prevent deadlock.
+            // A consuming thread may being blocked on the memory data manager 
lock, while holding
+            // the viewOperation lock.
+            consumeIndexes.add(
+                    viewOperation == null ? -1 : 
viewOperation.getConsumingOffset(false) + 1);
         }
         return consumeIndexes;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
index a355765ac35..049eed5f8f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
@@ -184,7 +184,9 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
     /** Refresh downstream consumption progress for another round scheduling 
of reading. */
     @Override
     public void prepareForScheduling() {
-        bufferIndexManager.updateLastConsumed(operations.getConsumingOffset());
+        // Access the consuming offset with lock, to prevent loading any 
buffer released from the
+        // memory data manager that is already consumed.
+        
bufferIndexManager.updateLastConsumed(operations.getConsumingOffset(true));
     }
 
     /** Provides priority calculation logic for io scheduler. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
index b7af284287b..a56947b0e01 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
@@ -134,8 +134,12 @@ public class HsSubpartitionView
         }
     }
 
+    @SuppressWarnings("FieldAccessNotGuarded")
     @Override
-    public int getConsumingOffset() {
+    public int getConsumingOffset(boolean withLock) {
+        if (!withLock) {
+            return lastConsumedBufferIndex;
+        }
         synchronized (lock) {
             return lastConsumedBufferIndex;
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
index 053e5a291ba..ab967e3d450 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
@@ -27,6 +27,13 @@ public interface HsSubpartitionViewInternalOperations {
     /** Callback for new data become available. */
     void notifyDataAvailable();
 
-    /** Get the latest consuming offset of the subpartition. */
-    int getConsumingOffset();
+    /**
+     * Get the latest consuming offset of the subpartition.
+     *
+     * @param withLock If true, read the consuming offset outside the guarding 
of lock. This is
+     *     sometimes desired to avoid lock contention, if the caller does not 
depend on any other
+     *     states to change atomically with the consuming offset.
+     * @return latest consuming offset.
+     */
+    int getConsumingOffset(boolean withLock);
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
index 30535926437..5782d5a510c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
@@ -176,7 +176,7 @@ class HsSubpartitionFileReaderImplTest {
 
         subpartitionOperation.advanceConsumptionProgress();
         subpartitionOperation.advanceConsumptionProgress();
-        assertThat(subpartitionOperation.getConsumingOffset()).isEqualTo(1);
+        
assertThat(subpartitionOperation.getConsumingOffset(true)).isEqualTo(1);
         // update consumptionProgress
         subpartitionFileReader.prepareForScheduling();
         // read buffer, expected buffer with index: 2
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
index b3331b12dbc..659984f2f0a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
@@ -18,15 +18,22 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid;
 
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import 
org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
 import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import 
org.apache.flink.runtime.io.network.partition.ResultSubpartitionView.AvailabilityWithBacklog;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -63,6 +70,59 @@ class HsSubpartitionViewTest {
         assertThat(nextBuffer).isSameAs(bufferAndBacklog);
     }
 
+    @Test
+    @Timeout(60)
+    void testDeadLock(@TempDir Path dataFilePath) throws Exception {
+        final int bufferSize = 16;
+        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 
bufferSize);
+        BufferPool bufferPool = networkBufferPool.createBufferPool(10, 10);
+        HsSubpartitionView subpartitionView = createSubpartitionView();
+
+        CompletableFuture<Void> acquireWriteLock = new CompletableFuture<>();
+
+        CheckedThread consumerThread =
+                new CheckedThread() {
+                    @Override
+                    public void go() throws Exception {
+                        // blocking until other thread acquire write lock.
+                        acquireWriteLock.get();
+                        subpartitionView.getNextBuffer();
+                    }
+                };
+
+        TestingSpillingStrategy spillingStrategy =
+                TestingSpillingStrategy.builder()
+                        .setOnMemoryUsageChangedFunction((ignore1, ignore2) -> 
Optional.empty())
+                        .setDecideActionWithGlobalInfoFunction(
+                                (spillingInfoProvider) -> {
+                                    acquireWriteLock.complete(null);
+                                    try {
+                                        consumerThread.trySync(10);
+                                    } catch (Exception e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                    
spillingInfoProvider.getNextBufferIndexToConsume();
+                                    return 
HsSpillingStrategy.Decision.NO_ACTION;
+                                })
+                        .build();
+        HsMemoryDataManager memoryDataManager =
+                new HsMemoryDataManager(
+                        1,
+                        bufferSize,
+                        bufferPool,
+                        spillingStrategy,
+                        new HsFileDataIndexImpl(1),
+                        dataFilePath.resolve(".data"),
+                        null);
+        HsDataView hsDataView = memoryDataManager.registerSubpartitionView(0, 
subpartitionView);
+        subpartitionView.setMemoryDataView(hsDataView);
+        subpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
+
+        consumerThread.start();
+        // trigger request buffer.
+        memoryDataManager.append(ByteBuffer.allocate(bufferSize), 0, 
DataType.DATA_BUFFER);
+    }
+
     @Test
     void testGetNextBufferFromDiskNextDataTypeIsNone() {
         HsSubpartitionView subpartitionView = createSubpartitionView();
@@ -308,11 +368,11 @@ class HsSubpartitionViewTest {
         subpartitionView.setDiskDataView(diskDataView);
         subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
 
-        assertThat(subpartitionView.getConsumingOffset()).isEqualTo(-1);
+        assertThat(subpartitionView.getConsumingOffset(true)).isEqualTo(-1);
         subpartitionView.getNextBuffer();
-        assertThat(subpartitionView.getConsumingOffset()).isEqualTo(0);
+        assertThat(subpartitionView.getConsumingOffset(true)).isEqualTo(0);
         subpartitionView.getNextBuffer();
-        assertThat(subpartitionView.getConsumingOffset()).isEqualTo(1);
+        assertThat(subpartitionView.getConsumingOffset(true)).isEqualTo(1);
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
index 7ac32be04f9..ff2f8250b2a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
@@ -32,7 +32,7 @@ public class TestingSubpartitionViewInternalOperation
     }
 
     @Override
-    public int getConsumingOffset() {
+    public int getConsumingOffset(boolean withLock) {
         return consumingOffset;
     }
 

Reply via email to