This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dc995f2 [SPARK-38683][SHUFFLE] It is unnecessary to release the
ShuffleManagedBufferIterator or ShuffleChunkManagedBufferIterator or
ManagedBufferIterator buffers when the client channel's connection is terminated
dc995f2 is described below
commit dc995f29592c720b959c877e31133bcd706d9b4e
Author: weixiuli <[email protected]>
AuthorDate: Fri Apr 1 10:06:19 2022 +0800
[SPARK-38683][SHUFFLE] It is unnecessary to release the
ShuffleManagedBufferIterator or ShuffleChunkManagedBufferIterator or
ManagedBufferIterator buffers when the client channel's connection is terminated
### What changes were proposed in this pull request?
It is unnecessary to release the ShuffleManagedBufferIterator or
ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the
client channel's connection is terminated.
If a client connection is closed before the iterator is fully drained,
then the remaining materialized buffers should all be released, but some
buffers like `ShuffleManagedBufferIterator`,
`ShuffleChunkManagedBufferIterator`, `ManagedBufferIterator` are not
materialized until the iterator is traversed by calling next(), so we should
not traverse and release them in order to avoid unnecessary buffer
materialization, which could be I/O based.
### Why are the changes needed?
To reduce I/O operations for the External Shuffle Service.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unittests.
Closes #36000 from weixiuli/SPARK-38683-unnecessary-release.
Authored-by: weixiuli <[email protected]>
Signed-off-by: yi.wu <[email protected]>
---
.../network/server/OneForOneStreamManager.java | 34 ++++++++++++++++++----
.../server/OneForOneStreamManagerSuite.java | 33 +++++++++++++++++++++
.../network/shuffle/ExternalBlockHandler.java | 4 +--
.../network/shuffle/ExternalBlockHandlerSuite.java | 5 ++--
4 files changed, 66 insertions(+), 10 deletions(-)
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
index dfa31c0..ace409e 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
@@ -51,6 +51,12 @@ public class OneForOneStreamManager extends StreamManager {
// The channel associated to the stream
final Channel associatedChannel;
+ // Indicates whether the buffers is only materialized when next() is
called. Some buffers like
+ // ShuffleManagedBufferIterator, ShuffleChunkManagedBufferIterator,
ManagedBufferIterator are
+ // not materialized until the iterator is traversed by calling next(). We
use it to decide
+ // whether buffers should be released at connectionTerminated() in order
to avoid unnecessary
+ // buffer materialization, which could be I/O based.
+ final boolean isBufferMaterializedOnNext;
// Used to keep track of the index of the buffer that the user has
retrieved, just to ensure
// that the caller only requests each chunk one at a time, in order.
@@ -59,10 +65,15 @@ public class OneForOneStreamManager extends StreamManager {
// Used to keep track of the number of chunks being transferred and not
finished yet.
final AtomicLong chunksBeingTransferred = new AtomicLong(0L);
- StreamState(String appId, Iterator<ManagedBuffer> buffers, Channel
channel) {
+ StreamState(
+ String appId,
+ Iterator<ManagedBuffer> buffers,
+ Channel channel,
+ boolean isBufferMaterializedOnNext) {
this.appId = appId;
this.buffers = Preconditions.checkNotNull(buffers);
this.associatedChannel = channel;
+ this.isBufferMaterializedOnNext = isBufferMaterializedOnNext;
}
}
@@ -130,7 +141,7 @@ public class OneForOneStreamManager extends StreamManager {
try {
// Release all remaining buffers.
- while (state.buffers.hasNext()) {
+ while (!state.isBufferMaterializedOnNext && state.buffers.hasNext())
{
ManagedBuffer buffer = state.buffers.next();
if (buffer != null) {
buffer.release();
@@ -205,8 +216,11 @@ public class OneForOneStreamManager extends StreamManager {
/**
* Registers a stream of ManagedBuffers which are served as individual
chunks one at a time to
* callers. Each ManagedBuffer will be release()'d after it is transferred
on the wire. If a
- * client connection is closed before the iterator is fully drained, then
the remaining buffers
- * will all be release()'d.
+ * client connection is closed before the iterator is fully drained, then
the remaining
+ * materialized buffers will all be release()'d, but some buffers like
+ * ShuffleManagedBufferIterator, ShuffleChunkManagedBufferIterator,
ManagedBufferIterator should
+ * not release, because they have not been materialized before requesting
the iterator by
+ * the next method.
*
* If an app ID is provided, only callers who've authenticated with the
given app ID will be
* allowed to fetch from this stream.
@@ -215,12 +229,20 @@ public class OneForOneStreamManager extends StreamManager
{
* to be the only reader of the stream. Once the connection is closed, the
stream will never
* be used again, enabling cleanup by `connectionTerminated`.
*/
- public long registerStream(String appId, Iterator<ManagedBuffer> buffers,
Channel channel) {
+ public long registerStream(
+ String appId,
+ Iterator<ManagedBuffer> buffers,
+ Channel channel,
+ boolean isBufferMaterializedOnNext) {
long myStreamId = nextStreamId.getAndIncrement();
- streams.put(myStreamId, new StreamState(appId, buffers, channel));
+ streams.put(myStreamId, new StreamState(appId, buffers, channel,
isBufferMaterializedOnNext));
return myStreamId;
}
+ public long registerStream(String appId, Iterator<ManagedBuffer> buffers,
Channel channel) {
+ return registerStream(appId, buffers, channel, false);
+ }
+
@VisibleForTesting
public int numStreamStates() {
return streams.size();
diff --git
a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java
b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java
index b65daaf..54f2617 100644
---
a/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java
+++
b/common/network-common/src/test/java/org/apache/spark/network/server/OneForOneStreamManagerSuite.java
@@ -126,4 +126,37 @@ public class OneForOneStreamManagerSuite {
Mockito.verify(mockManagedBuffer, Mockito.times(1)).release();
Assert.assertEquals(0, manager.numStreamStates());
}
+
+ @Test
+ public void streamStatesAreFreeOrNotWhenConnectionIsClosed() {
+ OneForOneStreamManager manager = new OneForOneStreamManager();
+ ManagedBuffer mockManagedBuffer = Mockito.mock(ManagedBuffer.class);
+
+ Iterator<ManagedBuffer> buffers1 = Mockito.mock(Iterator.class);
+ Mockito.when(buffers1.hasNext()).thenReturn(true).thenReturn(false);
+ Mockito.when(buffers1.next()).thenReturn(mockManagedBuffer);
+
+ Iterator<ManagedBuffer> buffers2 = Mockito.mock(Iterator.class);
+ Mockito.when(buffers2.hasNext()).thenReturn(true);
+ Mockito.when(buffers2.next()).thenReturn(mockManagedBuffer);
+
+ Channel dummyChannel = Mockito.mock(Channel.class,
Mockito.RETURNS_SMART_NULLS);
+ // should Release,
+ manager.registerStream("appId", buffers1, dummyChannel, false);
+ // should NOT Release
+ manager.registerStream("appId", buffers2, dummyChannel, true);
+ Assert.assertEquals(2, manager.numStreamStates());
+
+ // connectionTerminated
+ manager.connectionTerminated(dummyChannel);
+
+ Mockito.verify(buffers1, Mockito.times(2)).hasNext();
+ Mockito.verify(buffers1, Mockito.times(1)).next();
+
+ Mockito.verify(buffers2, Mockito.times(0)).hasNext();
+ Mockito.verify(buffers2, Mockito.times(0)).next();
+ // only buffers1 has been released
+ Mockito.verify(mockManagedBuffer, Mockito.times(1)).release();
+ Assert.assertEquals(0, manager.numStreamStates());
+ }
}
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
index 52bc0f9c..9b2e7a4 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
@@ -157,14 +157,14 @@ public class ExternalBlockHandler extends RpcHandler
iterator = new
ShuffleChunkManagedBufferIterator((FetchShuffleBlockChunks) msgObj);
}
streamId = streamManager.registerStream(client.getClientId(),
iterator,
- client.getChannel());
+ client.getChannel(), true);
} else {
// For the compatibility with the old version, still keep the
support for OpenBlocks.
OpenBlocks msg = (OpenBlocks) msgObj;
numBlockIds = msg.blockIds.length;
checkAuth(client, msg.appId);
streamId = streamManager.registerStream(client.getClientId(),
- new ManagedBufferIterator(msg), client.getChannel());
+ new ManagedBufferIterator(msg), client.getChannel(), true);
}
if (logger.isTraceEnabled()) {
logger.trace(
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
index 14896c8..18efc10 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
@@ -302,7 +302,7 @@ public class ExternalBlockHandlerSuite {
ArgumentCaptor<Iterator<ManagedBuffer>> stream =
(ArgumentCaptor<Iterator<ManagedBuffer>>)
(ArgumentCaptor<?>) ArgumentCaptor.forClass(Iterator.class);
verify(streamManager, times(1)).registerStream(anyString(),
stream.capture(),
- any());
+ any(), anyBoolean());
Iterator<ManagedBuffer> buffers = stream.getValue();
for (ManagedBuffer blockMarker : blockMarkers) {
assertEquals(blockMarker, buffers.next());
@@ -451,7 +451,8 @@ public class ExternalBlockHandlerSuite {
@SuppressWarnings("unchecked")
ArgumentCaptor<Iterator<ManagedBuffer>> stream =
(ArgumentCaptor<Iterator<ManagedBuffer>>)
(ArgumentCaptor<?>) ArgumentCaptor.forClass(Iterator.class);
- verify(streamManager, times(1)).registerStream(any(), stream.capture(),
any());
+ verify(streamManager, times(1)).registerStream(any(), stream.capture(),
+ any(), anyBoolean());
Iterator<ManagedBuffer> bufferIter = stream.getValue();
for (int reduceId = 0; reduceId < 2; reduceId++) {
for (int chunkId = 0; chunkId < 2; chunkId++) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]