This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b442394c65b [FLINK-28785][network] Hybrid shuffle consumer thread and
upstream thread may have deadlock.
b442394c65b is described below
commit b442394c65b1e924c4b5710a9453e3c7eacf05b5
Author: Weijie Guo <[email protected]>
AuthorDate: Wed Aug 3 14:24:21 2022 +0800
[FLINK-28785][network] Hybrid shuffle consumer thread and upstream thread
may have deadlock.
In hybrid shuffle mode, subpartition view lock will be acquired by consumer
thread, and further wait the read lock of MemoryDataManager. But
MemoryDataManager may acquire write lock to make a global spilling decision,
and then wait subpartition view lock to get consuming offset. In this case,
deadlock will occurs.
consumer thread : acqurie subpartition lock -> wait read lock.
upstream thread : acquire write lock -> wait subpartition lock.
This closes #20456
---
.../partition/hybrid/HsMemoryDataManager.java | 6 +-
.../hybrid/HsSubpartitionFileReaderImpl.java | 4 +-
.../partition/hybrid/HsSubpartitionView.java | 6 +-
.../HsSubpartitionViewInternalOperations.java | 11 +++-
.../hybrid/HsSubpartitionFileReaderImplTest.java | 2 +-
.../partition/hybrid/HsSubpartitionViewTest.java | 66 +++++++++++++++++++++-
.../TestingSubpartitionViewInternalOperation.java | 2 +-
7 files changed, 87 insertions(+), 10 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index b887a157cfa..d7f0e2b346d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -190,7 +190,11 @@ public class HsMemoryDataManager implements
HsSpillingInfoProvider, HsMemoryData
for (int channel = 0; channel < numSubpartitions; channel++) {
HsSubpartitionViewInternalOperations viewOperation =
subpartitionViewOperationsMap.get(channel);
- consumeIndexes.add(viewOperation == null ? -1 :
viewOperation.getConsumingOffset() + 1);
+ // Access consuming offset without lock to prevent deadlock.
+ // A consuming thread may being blocked on the memory data manager
lock, while holding
+ // the viewOperation lock.
+ consumeIndexes.add(
+ viewOperation == null ? -1 :
viewOperation.getConsumingOffset(false) + 1);
}
return consumeIndexes;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
index a355765ac35..049eed5f8f6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
@@ -184,7 +184,9 @@ public class HsSubpartitionFileReaderImpl implements
HsSubpartitionFileReader {
/** Refresh downstream consumption progress for another round scheduling
of reading. */
@Override
public void prepareForScheduling() {
- bufferIndexManager.updateLastConsumed(operations.getConsumingOffset());
+ // Access the consuming offset with lock, to prevent loading any
buffer released from the
+ // memory data manager that is already consumed.
+
bufferIndexManager.updateLastConsumed(operations.getConsumingOffset(true));
}
/** Provides priority calculation logic for io scheduler. */
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
index b7af284287b..a56947b0e01 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java
@@ -134,8 +134,12 @@ public class HsSubpartitionView
}
}
+ @SuppressWarnings("FieldAccessNotGuarded")
@Override
- public int getConsumingOffset() {
+ public int getConsumingOffset(boolean withLock) {
+ if (!withLock) {
+ return lastConsumedBufferIndex;
+ }
synchronized (lock) {
return lastConsumedBufferIndex;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
index 053e5a291ba..ab967e3d450 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
@@ -27,6 +27,13 @@ public interface HsSubpartitionViewInternalOperations {
/** Callback for new data become available. */
void notifyDataAvailable();
- /** Get the latest consuming offset of the subpartition. */
- int getConsumingOffset();
+ /**
+ * Get the latest consuming offset of the subpartition.
+ *
+ * @param withLock If true, read the consuming offset outside the guarding
of lock. This is
+ * sometimes desired to avoid lock contention, if the caller does not
depend on any other
+ * states to change atomically with the consuming offset.
+ * @return latest consuming offset.
+ */
+ int getConsumingOffset(boolean withLock);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
index 30535926437..5782d5a510c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
@@ -176,7 +176,7 @@ class HsSubpartitionFileReaderImplTest {
subpartitionOperation.advanceConsumptionProgress();
subpartitionOperation.advanceConsumptionProgress();
- assertThat(subpartitionOperation.getConsumingOffset()).isEqualTo(1);
+
assertThat(subpartitionOperation.getConsumingOffset(true)).isEqualTo(1);
// update consumptionProgress
subpartitionFileReader.prepareForScheduling();
// read buffer, expected buffer with index: 2
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
index b3331b12dbc..659984f2f0a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java
@@ -18,15 +18,22 @@
package org.apache.flink.runtime.io.network.partition.hybrid;
+import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import
org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
import
org.apache.flink.runtime.io.network.partition.ResultSubpartitionView.AvailabilityWithBacklog;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
@@ -63,6 +70,59 @@ class HsSubpartitionViewTest {
assertThat(nextBuffer).isSameAs(bufferAndBacklog);
}
+ @Test
+ @Timeout(60)
+ void testDeadLock(@TempDir Path dataFilePath) throws Exception {
+ final int bufferSize = 16;
+ NetworkBufferPool networkBufferPool = new NetworkBufferPool(10,
bufferSize);
+ BufferPool bufferPool = networkBufferPool.createBufferPool(10, 10);
+ HsSubpartitionView subpartitionView = createSubpartitionView();
+
+ CompletableFuture<Void> acquireWriteLock = new CompletableFuture<>();
+
+ CheckedThread consumerThread =
+ new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ // blocking until other thread acquire write lock.
+ acquireWriteLock.get();
+ subpartitionView.getNextBuffer();
+ }
+ };
+
+ TestingSpillingStrategy spillingStrategy =
+ TestingSpillingStrategy.builder()
+ .setOnMemoryUsageChangedFunction((ignore1, ignore2) ->
Optional.empty())
+ .setDecideActionWithGlobalInfoFunction(
+ (spillingInfoProvider) -> {
+ acquireWriteLock.complete(null);
+ try {
+ consumerThread.trySync(10);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
spillingInfoProvider.getNextBufferIndexToConsume();
+ return
HsSpillingStrategy.Decision.NO_ACTION;
+ })
+ .build();
+ HsMemoryDataManager memoryDataManager =
+ new HsMemoryDataManager(
+ 1,
+ bufferSize,
+ bufferPool,
+ spillingStrategy,
+ new HsFileDataIndexImpl(1),
+ dataFilePath.resolve(".data"),
+ null);
+ HsDataView hsDataView = memoryDataManager.registerSubpartitionView(0,
subpartitionView);
+ subpartitionView.setMemoryDataView(hsDataView);
+ subpartitionView.setDiskDataView(TestingHsDataView.NO_OP);
+
+ consumerThread.start();
+ // trigger request buffer.
+ memoryDataManager.append(ByteBuffer.allocate(bufferSize), 0,
DataType.DATA_BUFFER);
+ }
+
@Test
void testGetNextBufferFromDiskNextDataTypeIsNone() {
HsSubpartitionView subpartitionView = createSubpartitionView();
@@ -308,11 +368,11 @@ class HsSubpartitionViewTest {
subpartitionView.setDiskDataView(diskDataView);
subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP);
- assertThat(subpartitionView.getConsumingOffset()).isEqualTo(-1);
+ assertThat(subpartitionView.getConsumingOffset(true)).isEqualTo(-1);
subpartitionView.getNextBuffer();
- assertThat(subpartitionView.getConsumingOffset()).isEqualTo(0);
+ assertThat(subpartitionView.getConsumingOffset(true)).isEqualTo(0);
subpartitionView.getNextBuffer();
- assertThat(subpartitionView.getConsumingOffset()).isEqualTo(1);
+ assertThat(subpartitionView.getConsumingOffset(true)).isEqualTo(1);
}
@Test
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
index 7ac32be04f9..ff2f8250b2a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
@@ -32,7 +32,7 @@ public class TestingSubpartitionViewInternalOperation
}
@Override
- public int getConsumingOffset() {
+ public int getConsumingOffset(boolean withLock) {
return consumingOffset;
}