This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1bd897c [SPARK-32918][SHUFFLE] RPC implementation to support control
plane coordination for push-based shuffle
1bd897c is described below
commit 1bd897cbc4fe30eb8b7740c7232aae87081e8e33
Author: Ye Zhou <[email protected]>
AuthorDate: Mon Nov 23 15:16:20 2020 -0600
[SPARK-32918][SHUFFLE] RPC implementation to support control plane
coordination for push-based shuffle
### What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 which is needed for
push-based shuffle.
Summary of changes:
This PR introduces a new RPC to be called within Driver. When the expected
shuffle push wait time reaches, Driver will call this RPC to facilitate
coordination of shuffle map/reduce stages and notify external shuffle services
to finalize shuffle block merge for a given shuffle. Shuffle services also
respond back the metadata about a merged shuffle partition back to the caller.
### Why are the changes needed?
Refer to the SPIP in SPARK-30602.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
This code snippets won't be called by any existing code and will be tested
after the coordinated driver changes gets merged in SPARK-32920.
Lead-authored-by: Min Shen mshenlinkedin.com
Closes #30163 from zhouyejoe/SPARK-32918.
Lead-authored-by: Ye Zhou <[email protected]>
Co-authored-by: Min Shen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../spark/network/shuffle/BlockStoreClient.java | 22 +++++++++++
.../network/shuffle/ExternalBlockStoreClient.java | 29 +++++++++++++++
.../network/shuffle/MergeFinalizerListener.java | 43 ++++++++++++++++++++++
3 files changed, 94 insertions(+)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
index 37befcd..a6bdc13 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
@@ -147,6 +147,8 @@ public abstract class BlockStoreClient implements Closeable
{
* @param blockIds block ids to be pushed
* @param buffers buffers to be pushed
* @param listener the listener to receive block push status.
+ *
+ * @since 3.1.0
*/
public void pushBlocks(
String host,
@@ -156,4 +158,24 @@ public abstract class BlockStoreClient implements
Closeable {
BlockFetchingListener listener) {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Invoked by Spark driver to notify external shuffle services to finalize
the shuffle merge
+ * for a given shuffle. This allows the driver to start the shuffle reducer
stage after properly
+ * finishing the shuffle merge process associated with the shuffle mapper
stage.
+ *
+ * @param host host of shuffle server
+ * @param port port of shuffle server.
+ * @param shuffleId shuffle ID of the shuffle to be finalized
+ * @param listener the listener to receive MergeStatuses
+ *
+ * @since 3.1.0
+ */
+ public void finalizeShuffleMerge(
+ String host,
+ int port,
+ int shuffleId,
+ MergeFinalizerListener listener) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index eca35ed..56c06e6 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -159,6 +159,35 @@ public class ExternalBlockStoreClient extends
BlockStoreClient {
}
@Override
+ public void finalizeShuffleMerge(
+ String host,
+ int port,
+ int shuffleId,
+ MergeFinalizerListener listener) {
+ checkInit();
+ try {
+ TransportClient client = clientFactory.createClient(host, port);
+ ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId,
shuffleId).toByteBuffer();
+ client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
+ @Override
+ public void onSuccess(ByteBuffer response) {
+ listener.onShuffleMergeSuccess(
+ (MergeStatuses)
BlockTransferMessage.Decoder.fromByteBuffer(response));
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ listener.onShuffleMergeFailure(e);
+ }
+ });
+ } catch (Exception e) {
+ logger.error("Exception while sending finalizeShuffleMerge request to
{}:{}",
+ host, port, e);
+ listener.onShuffleMergeFailure(e);
+ }
+ }
+
+ @Override
public MetricSet shuffleMetrics() {
checkInit();
return clientFactory.getAllMetrics();
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java
new file mode 100644
index 0000000..08e13ee
--- /dev/null
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.util.EventListener;
+
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
+
+/**
+ * :: DeveloperApi ::
+ *
+ * Listener providing a callback function to invoke when driver receives the
response for the
+ * finalize shuffle merge request sent to remote shuffle service.
+ *
+ * @since 3.1.0
+ */
+public interface MergeFinalizerListener extends EventListener {
+ /**
+ * Called once upon successful response on finalize shuffle merge on a
remote shuffle service.
+ * The returned {@link MergeStatuses} is passed to the listener for further
processing
+ */
+ void onShuffleMergeSuccess(MergeStatuses statuses);
+
+ /**
+ * Called once upon failure response on finalize shuffle merge on a remote
shuffle service.
+ */
+ void onShuffleMergeFailure(Throwable e);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]