This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ca25534 [SPARK-37509][CORE] Improve Fallback Storage upload speed by
avoiding S3 rate limiter
ca25534 is described below
commit ca2553443977264e2e897006dc729ba61147829f
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Nov 30 15:03:00 2021 -0800
[SPARK-37509][CORE] Improve Fallback Storage upload speed by avoiding S3
rate limiter
### What changes were proposed in this pull request?
This PR aims to improve `Fallback Storage` upload speed by randomizing the
path in order to avoid S3 rate limiter.
### Why are the changes needed?
Currently, `Fallback Storage` is using `a single prefix per shuffle`. This
PR aims to randomize the upload prefixes even in a single shuffle to avoid S3
rate limiter.
-
https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/
### Does this PR introduce _any_ user-facing change?
No. This is used internally during the runtime.
### How was this patch tested?
Pass the CIs to verify read and write operations. To check the layout,
check the uploaded path manually with the following configs.
```
spark.decommission.enabled true
spark.storage.decommission.enabled true
spark.storage.decommission.shuffleBlocks.enabled true
spark.storage.decommission.fallbackStorage.path file:///tmp/fallback/
```
Start one master and worker. Connect with `spark-shell` and generate
shuffle data.
```
scala> sc.parallelize(1 to 11, 10).map(x => (x % 3, 1)).reduceByKey(_ +
_).count()
res0: Long = 3
```
Invoke decommission and check. Since we have only one worker, the shuffle
data go to the fallback storage directly.
```
$ kill -PWR <CoarseGrainedExecutorBackend JVM PID>
$ tree /tmp/fallback
/tmp/fallback
└── app-20211130135922-0001
└── 0
├── 103417883
│ └── shuffle_0_7_0.data
├── 1036881592
│ └── shuffle_0_4_0.data
├── 1094002679
│ └── shuffle_0_7_0.index
├── 1393510154
│ └── shuffle_0_6_0.index
├── 1515275369
│ └── shuffle_0_3_0.data
├── 1541340402
│ └── shuffle_0_2_0.index
├── 1639392452
│ └── shuffle_0_8_0.data
├── 1774061049
│ └── shuffle_0_9_0.index
├── 1846228218
│ └── shuffle_0_6_0.data
├── 1970345301
│ └── shuffle_0_1_0.data
├── 2073568524
│ └── shuffle_0_4_0.index
├── 227534966
│ └── shuffle_0_2_0.data
├── 266114061
│ └── shuffle_0_3_0.index
├── 413944309
│ └── shuffle_0_5_0.index
├── 581811660
│ └── shuffle_0_0_0.data
├── 705928743
│ └── shuffle_0_5_0.data
├── 713451784
│ └── shuffle_0_8_0.index
├── 861282032
│ └── shuffle_0_0_0.index
├── 912764509
│ └── shuffle_0_9_0.data
└── 946172431
└── shuffle_0_1_0.index
```
Closes #34762 from dongjoon-hyun/SPARK-37509.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../scala/org/apache/spark/storage/FallbackStorage.scala | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
index 7613713..d137099 100644
--- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
@@ -31,6 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import
org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP,
STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
@@ -60,15 +61,17 @@ private[storage] class FallbackStorage(conf: SparkConf)
extends Logging {
val indexFile = r.getIndexFile(shuffleId, mapId)
if (indexFile.exists()) {
+ val hash = JavaUtils.nonNegativeHash(indexFile.getName)
fallbackFileSystem.copyFromLocalFile(
new Path(indexFile.getAbsolutePath),
- new Path(fallbackPath, s"$appId/$shuffleId/${indexFile.getName}"))
+ new Path(fallbackPath,
s"$appId/$shuffleId/$hash/${indexFile.getName}"))
val dataFile = r.getDataFile(shuffleId, mapId)
if (dataFile.exists()) {
+ val hash = JavaUtils.nonNegativeHash(dataFile.getName)
fallbackFileSystem.copyFromLocalFile(
new Path(dataFile.getAbsolutePath),
- new Path(fallbackPath, s"$appId/$shuffleId/${dataFile.getName}"))
+ new Path(fallbackPath,
s"$appId/$shuffleId/$hash/${dataFile.getName}"))
}
// Report block statuses
@@ -86,7 +89,8 @@ private[storage] class FallbackStorage(conf: SparkConf)
extends Logging {
}
def exists(shuffleId: Int, filename: String): Boolean = {
- fallbackFileSystem.exists(new Path(fallbackPath,
s"$appId/$shuffleId/$filename"))
+ val hash = JavaUtils.nonNegativeHash(filename)
+ fallbackFileSystem.exists(new Path(fallbackPath,
s"$appId/$shuffleId/$hash/$filename"))
}
}
@@ -168,7 +172,8 @@ private[spark] object FallbackStorage extends Logging {
}
val name = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
- val indexFile = new Path(fallbackPath, s"$appId/$shuffleId/$name")
+ val hash = JavaUtils.nonNegativeHash(name)
+ val indexFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
val start = startReduceId * 8L
val end = endReduceId * 8L
Utils.tryWithResource(fallbackFileSystem.open(indexFile)) { inputStream =>
@@ -178,7 +183,8 @@ private[spark] object FallbackStorage extends Logging {
index.skip(end - (start + 8L))
val nextOffset = index.readLong()
val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
- val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$name")
+ val hash = JavaUtils.nonNegativeHash(name)
+ val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
val f = fallbackFileSystem.open(dataFile)
val size = nextOffset - offset
logDebug(s"To byte array $size")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]