This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2270ecf [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level
user documentation
2270ecf is described below
commit 2270ecf32f7ae478570145219d2ce71a642076cf
Author: Venkata krishnan Sowrirajan <[email protected]>
AuthorDate: Mon Aug 16 10:24:40 2021 -0500
[SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation
### What changes were proposed in this pull request?
Document the push-based shuffle feature with a high level overview of the
feature and corresponding configuration options for both shuffle server side as
well as client side. This is how the changes to the doc looks on the browser
([img](https://user-images.githubusercontent.com/8871522/129231582-ad86ee2f-246f-4b42-9528-4ccd693e86d2.png))
### Why are the changes needed?
Helps users understand the feature
### Does this PR introduce _any_ user-facing change?
Docs
### How was this patch tested?
N/A
Closes #33615 from venkata91/SPARK-36374.
Authored-by: Venkata krishnan Sowrirajan <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../apache/spark/network/util/TransportConf.java | 28 ++++--
.../shuffle/RemoteBlockPushResolverSuite.java | 2 +-
.../org/apache/spark/internal/config/package.scala | 63 ++++++------
docs/configuration.md | 106 +++++++++++++++++++++
4 files changed, 157 insertions(+), 42 deletions(-)
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 69b8b25..ed0ca918 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -390,24 +390,32 @@ public class TransportConf {
/**
* The minimum size of a chunk when dividing a merged shuffle file into
multiple chunks during
* push-based shuffle.
- * A merged shuffle file consists of multiple small shuffle blocks. Fetching
the
- * complete merged shuffle file in a single response increases the memory
requirements for the
- * clients. Instead of serving the entire merged file, the shuffle service
serves the
- * merged file in `chunks`. A `chunk` constitutes few shuffle blocks in
entirety and this
- * configuration controls how big a chunk can get. A corresponding index
file for each merged
- * shuffle file will be generated indicating chunk boundaries.
+ * A merged shuffle file consists of multiple small shuffle blocks. Fetching
the complete
+ * merged shuffle file in a single disk I/O increases the memory
requirements for both the
+ * clients and the external shuffle service. Instead, the external shuffle
service serves
+ * the merged file in MB-sized chunks. This configuration controls how big a
chunk can get.
+ * A corresponding index file for each merged shuffle file will be generated
indicating chunk
+ * boundaries.
+ *
+ * Setting this too high would increase the memory requirements on both the
clients and the
+ * external shuffle service.
+ *
+ * Setting this too low would increase the overall number of RPC requests to
external shuffle
+ * service unnecessarily.
*/
public int minChunkSizeInMergedShuffleFile() {
return Ints.checkedCast(JavaUtils.byteStringAsBytes(
- conf.get("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "2m")));
+ conf.get("spark.shuffle.push.server.minChunkSizeInMergedShuffleFile",
"2m")));
}
/**
- * The size of cache in memory which is used in push-based shuffle for
storing merged index files.
+ * The maximum size of cache in memory which is used in push-based shuffle
for storing merged
+ * index files. This cache is in addition to the one configured via
+ * spark.shuffle.service.index.cache.size.
*/
public long mergedIndexCacheSize() {
return JavaUtils.byteStringAsBytes(
- conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
+ conf.get("spark.shuffle.push.server.mergedIndexCacheSize", "100m"));
}
/**
@@ -417,7 +425,7 @@ public class TransportConf {
* blocks for this shuffle partition.
*/
public int ioExceptionsThresholdDuringMerge() {
- return
conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4);
+ return
conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4);
}
/**
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 10b6e3c..46b43bc 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -87,7 +87,7 @@ public class RemoteBlockPushResolverSuite {
public void before() throws IOException {
localDirs = createLocalDirs(2);
MapConfigProvider provider = new MapConfigProvider(
- ImmutableMap.of("spark.shuffle.server.minChunkSizeInMergedShuffleFile",
"4"));
+
ImmutableMap.of("spark.shuffle.push.server.minChunkSizeInMergedShuffleFile",
"4"));
conf = new TransportConf("shuffle", provider);
pushResolver = new RemoteBlockPushResolver(conf);
registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY),
MERGE_DIRECTORY_META);
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 17c585d..7ed1f1d 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2097,31 +2097,33 @@ package object config {
private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
ConfigBuilder("spark.shuffle.push.enabled")
- .doc("Set to 'true' to enable push-based shuffle on the client side and
this works in " +
+ .doc("Set to true to enable push-based shuffle on the client side and
this works in " +
"conjunction with the server side flag
spark.shuffle.server.mergedShuffleFileManagerImpl " +
"which needs to be set with the appropriate " +
"org.apache.spark.network.shuffle.MergedShuffleFileManager
implementation for push-based " +
"shuffle to be enabled")
- .version("3.1.0")
+ .version("3.2.0")
.booleanConf
.createWithDefault(false)
private[spark] val PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT =
- ConfigBuilder("spark.shuffle.push.merge.results.timeout")
- .doc("Specify the max amount of time DAGScheduler waits for the merge
results from " +
- "all remote shuffle services for a given shuffle. DAGScheduler will
start to submit " +
- "following stages if not all results are received within the timeout.")
+ ConfigBuilder("spark.shuffle.push.results.timeout")
+ .internal()
+ .doc("The maximum amount of time driver waits in seconds for the merge
results to be" +
+ " received from all remote external shuffle services for a given
shuffle. Driver" +
+ " submits following stages if not all results are received within the
timeout. Setting" +
+ " this too long could potentially lead to performance regression")
.version("3.2.0")
.timeConf(TimeUnit.SECONDS)
.checkValue(_ >= 0L, "Timeout must be >= 0.")
.createWithDefaultString("10s")
private[spark] val PUSH_BASED_SHUFFLE_MERGE_FINALIZE_TIMEOUT =
- ConfigBuilder("spark.shuffle.push.merge.finalize.timeout")
- .doc("Specify the amount of time DAGScheduler waits after all mappers
finish for " +
- "a given shuffle map stage before it starts sending merge finalize
requests to " +
- "remote shuffle services. This allows the shuffle services some extra
time to " +
- "merge as many blocks as possible.")
+ ConfigBuilder("spark.shuffle.push.finalize.timeout")
+ .doc("The amount of time driver waits, after all mappers have finished
for a given" +
+ " shuffle map stage, before it sends merge finalize requests to remote
external shuffle" +
+ " services. This gives the external shuffle services extra time to
merge blocks. Setting" +
+ " this too long could potentially lead to performance regression")
.version("3.2.0")
.timeConf(TimeUnit.SECONDS)
.checkValue(_ >= 0L, "Timeout must be >= 0.")
@@ -2129,54 +2131,53 @@ package object config {
private[spark] val SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS =
ConfigBuilder("spark.shuffle.push.maxRetainedMergerLocations")
- .doc("Maximum number of shuffle push merger locations cached for push
based shuffle. " +
- "Currently, shuffle push merger locations are nothing but external
shuffle services " +
- "which are responsible for handling pushed blocks and merging them and
serving " +
- "merged blocks for later shuffle fetch.")
- .version("3.1.0")
+ .doc("Maximum number of merger locations cached for push-based shuffle.
Currently, merger" +
+ " locations are hosts of external shuffle services responsible for
handling pushed" +
+ " blocks, merging them and serving merged blocks for later shuffle
fetch.")
+ .version("3.2.0")
.intConf
.createWithDefault(500)
private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
- .doc("The minimum number of shuffle merger locations required to enable
push based " +
- "shuffle for a stage. This is specified as a ratio of the number of
partitions in " +
- "the child stage. For example, a reduce stage which has 100 partitions
and uses the " +
- "default value 0.05 requires at least 5 unique merger locations to
enable push based " +
- "shuffle. Merger locations are currently defined as external shuffle
services.")
- .version("3.1.0")
+ .doc("Ratio used to compute the minimum number of shuffle merger
locations required for" +
+ " a stage based on the number of partitions for the reducer stage. For
example, a reduce" +
+ " stage which has 100 partitions and uses the default value 0.05
requires at least 5" +
+ " unique merger locations to enable push-based shuffle. Merger
locations are currently" +
+ " defined as external shuffle services.")
+ .version("3.2.0")
.doubleConf
.createWithDefault(0.05)
private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD =
ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold")
.doc(s"The static threshold for number of shuffle push merger locations
should be " +
- "available in order to enable push based shuffle for a stage. Note
this config " +
+ "available in order to enable push-based shuffle for a stage. Note
this config " +
s"works in conjunction with
${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key}. " +
"Maximum of spark.shuffle.push.mergersMinStaticThreshold and " +
s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} ratio number of
mergers needed to " +
- "enable push based shuffle for a stage. For eg: with 1000 partitions
for the child " +
+ "enable push-based shuffle for a stage. For eg: with 1000 partitions
for the child " +
"stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " +
s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} set to 0.05, we
would need " +
- "at least 50 mergers to enable push based shuffle for that stage.")
- .version("3.1.0")
+ "at least 50 mergers to enable push-based shuffle for that stage.")
+ .version("3.2.0")
.intConf
.createWithDefault(5)
private[spark] val SHUFFLE_NUM_PUSH_THREADS =
ConfigBuilder("spark.shuffle.push.numPushThreads")
.doc("Specify the number of threads in the block pusher pool. These
threads assist " +
- "in creating connections and pushing blocks to remote shuffle
services. By default, the " +
- "threadpool size is equal to the number of spark executor cores.")
+ "in creating connections and pushing blocks to remote external shuffle
services. By" +
+ " default, the threadpool size is equal to the number of spark
executor cores.")
.version("3.2.0")
.intConf
.createOptional
private[spark] val SHUFFLE_MAX_BLOCK_SIZE_TO_PUSH =
ConfigBuilder("spark.shuffle.push.maxBlockSizeToPush")
- .doc("The max size of an individual block to push to the remote shuffle
services. Blocks " +
- "larger than this threshold are not pushed to be merged remotely. These
shuffle blocks " +
- "will be fetched by the executors in the original manner.")
+ .doc("The max size of an individual block to push to the remote external
shuffle services." +
+ " Blocks larger than this threshold are not pushed to be merged
remotely. These shuffle" +
+ " blocks will be fetched by the executors in the original manner.")
.version("3.2.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1m")
diff --git a/docs/configuration.md b/docs/configuration.md
index 1ececa33..0d8acf7 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3163,3 +3163,109 @@ The stage level scheduling feature allows users to
specify task and executor res
This is only available for the RDD API in Scala, Java, and Python. It is
available on YARN and Kubernetes when dynamic allocation is enabled. See the
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page
for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this
feature. The current implementation acquires new executors for each
`ResourceProfile` created and currently has to be an exact match. Spark does
not try to fit tasks into an executor that require a different ResourceProfile
than the executor was created with. Executors that are not in use will idle
timeout with the dynamic allocation logic. The default configuration for this
feature is to only allow one Resour [...]
+
+# Push-based shuffle overview
+
+Push-based shuffle helps improve the reliability and performance of spark
shuffle. It takes a best-effort approach to push the shuffle blocks generated
by the map tasks to remote external shuffle services to be merged per shuffle
partition. Reduce tasks fetch a combination of merged shuffle partitions and
original shuffle blocks as their input data, resulting in converting small
random disk reads by external shuffle services into large sequential reads.
Possibility of better data localit [...]
+
+<p> Push-based shuffle improves performance for long running jobs/queries
which involves large disk I/O during shuffle. Currently it is not well suited
for jobs/queries which runs quickly dealing with lesser amount of shuffle data.
This will be further improved in the future releases.</p>
+
+<p> <b> Currently push-based shuffle is only supported for Spark on YARN with
external shuffle service. </b></p>
+
+### External Shuffle service(server) side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.push.server.mergedShuffleFileManagerImpl</code></td>
+ <td>
+ <code>org.apache.spark.network.shuffle.<br
/>NoOpMergedShuffleFileManager</code>
+ </td>
+ <td>
+ Class name of the implementation of <code>MergedShuffleFileManager</code>
that manages push-based shuffle. This acts as a server side config to disable
or enable push-based shuffle. By default, push-based shuffle is disabled at the
server side. <p> To enable push-based shuffle on the server side, set this
config to
<code>org.apache.spark.network.shuffle.RemoteBlockPushResolver</code></p>
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+
<td><code>spark.shuffle.push.server.minChunkSizeInMergedShuffleFile</code></td>
+ <td><code>2m</code></td>
+ <td>
+ <p> The minimum size of a chunk when dividing a merged shuffle file into
multiple chunks during push-based shuffle. A merged shuffle file consists of
multiple small shuffle blocks. Fetching the complete merged shuffle file in a
single disk I/O increases the memory requirements for both the clients and the
external shuffle services. Instead, the external shuffle service serves the
merged file in <code>MB-sized chunks</code>.<br /> This configuration controls
how big a chunk can get. A [...]
+ <p> Setting this too high would increase the memory requirements on both
the clients and the external shuffle service. </p>
+ <p> Setting this too low would increase the overall number of RPC requests
to external shuffle service unnecessarily.</p>
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.server.mergedIndexCacheSize</code></td>
+ <td><code>100m</code></td>
+ <td>
+ The maximum size of cache in memory which could be used in push-based
shuffle for storing merged index files. This cache is in addition to the one
configured via <code>spark.shuffle.service.index.cache.size</code>.
+ </td>
+ <td>3.2.0</td>
+</tr>
+</table>
+
+### Client side configuration options
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
+<tr>
+ <td><code>spark.shuffle.push.enabled</code></td>
+ <td><code>false</code></td>
+ <td>
+ Set to true to enable push-based shuffle on the client side and works in
conjunction with the server side flag
<code>spark.shuffle.server.mergedShuffleFileManagerImpl</code>.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.finalize.timeout</code></td>
+ <td><code>10s</code></td>
+ <td>
+ The amount of time driver waits in seconds, after all mappers have
finished for a given shuffle map stage, before it sends merge finalize requests
to remote external shuffle services. This gives the external shuffle services
extra time to merge blocks. Setting this too long could potentially lead to
performance regression.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.maxRetainedMergerLocations</code></td>
+ <td><code>500</code></td>
+ <td>
+ Maximum number of merger locations cached for push-based shuffle.
Currently, merger locations are hosts of external shuffle services responsible
for handling pushed blocks, merging them and serving merged blocks for later
shuffle fetch.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.mergersMinThresholdRatio</code></td>
+ <td><code>0.05</code></td>
+ <td>
+ Ratio used to compute the minimum number of shuffle merger locations
required for a stage based on the number of partitions for the reducer stage.
For example, a reduce stage which has 100 partitions and uses the default value
0.05 requires at least 5 unique merger locations to enable push-based shuffle.
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.mergersMinStaticThreshold</code></td>
+ <td><code>5</code></td>
+ <td>
+ The static threshold for number of shuffle push merger locations should be
available in order to enable push-based shuffle for a stage. Note this config
works in conjunction with
<code>spark.shuffle.push.mergersMinThresholdRatio</code>. Maximum of
<code>spark.shuffle.push.mergersMinStaticThreshold</code> and
<code>spark.shuffle.push.mergersMinThresholdRatio</code> ratio number of
mergers needed to enable push-based shuffle for a stage. For example: with 1000
partitions for the child [...]
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.maxBlockSizeToPush</code></td>
+ <td><code>1m</code></td>
+ <td>
+ <p> The max size of an individual block to push to the remote external
shuffle services. Blocks larger than this threshold are not pushed to be merged
remotely. These shuffle blocks will be fetched in the original manner. </p>
+ <p> Setting this too high would result in more blocks to be pushed to
remote external shuffle services but those are already efficiently fetched with
the existing mechanisms resulting in additional overhead of pushing the large
blocks to remote external shuffle services. It is recommended to set
<code>spark.shuffle.push.maxBlockSizeToPush</code> lesser than
<code>spark.shuffle.push.maxBlockBatchSize</code> config's value. </p>
+ <p> Setting this too low would result in lesser number of blocks getting
merged and directly fetched from mapper external shuffle service results in
higher small random reads affecting overall disk I/O performance. </p>
+ </td>
+ <td>3.2.0</td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.push.maxBlockBatchSize</code></td>
+ <td><code>3m</code></td>
+ <td>
+ The max size of a batch of shuffle blocks to be grouped into a single push
request. Default is set to <code>3m</code> in order to keep it slightly higher
than <code>spark.storage.memoryMapThreshold</code> default which is
<code>2m</code> as it is very likely that each batch of block gets memory
mapped which incurs higher overhead.
+ </td>
+ <td>3.2.0</td>
+</tr>
+</table>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]