This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 8889bf0 [SPARK-33206][CORE][3.1] Fix shuffle index cache weight
calculation for small index files
8889bf0 is described below
commit 8889bf07745275b9a955ed917c3b1de12ef85a93
Author: attilapiros <[email protected]>
AuthorDate: Thu Mar 3 10:37:54 2022 -0800
[SPARK-33206][CORE][3.1] Fix shuffle index cache weight calculation for
small index files
### What changes were proposed in this pull request?
Increasing the shuffle index weight with a constant number to avoid
underestimating retained memory size caused by the bookkeeping objects: the
`java.io.File` (depending on the path ~ 960 bytes) object and the
`ShuffleIndexInformation` object (~180 bytes).
### Why are the changes needed?
Underestimating cache entry size easily can cause OOM in the Yarn
NodeManager.
In the following analyses of a prod issue (HPROF file) we can see the leak
suspect Guava's `LocalCache$Segment` objects:
<img width="943" alt="Screenshot 2022-02-17 at 18 55 40"
src="https://user-images.githubusercontent.com/2017933/154541995-44014212-2046-41d6-ba7f-99369ca7d739.png">
Going further we can see a `ShuffleIndexInformation` for a small index file
(16 bytes) but the retained heap memory is 1192 bytes:
<img width="1351" alt="image"
src="https://user-images.githubusercontent.com/2017933/154645212-e0318d0f-cefa-4ae3-8a3b-97d2b506757d.png">
Finally we can see this is very common within this heap dump (using MAT's
Object Query Language):
<img width="1418" alt="image"
src="https://user-images.githubusercontent.com/2017933/154547678-44c8af34-1765-4e14-b71a-dc03d1a304aa.png">
I have even exported the data to a CSV and done some calculations with
`awk`:
```
$ tail -n+2 export.csv | awk -F, 'BEGIN { numUnderEstimated=0; } {
sumOldSize += $1; corrected=$1 + 1176; sumCorrectedSize += corrected;
sumRetainedMem += $2; if (corrected < $2) numUnderEstimated+=1; } END { print
"sum old size: " sumOldSize / 1024 / 1024 " MB, sum corrected size: "
sumCorrectedSize / 1024 / 1024 " MB, sum retained memory:" sumRetainedMem /
1024 / 1024 " MB, num under estimated: " numUnderEstimated }'
```
It gives the followings:
```
sum old size: 76.8785 MB, sum corrected size: 1066.93 MB, sum retained
memory:1064.47 MB, num under estimated: 0
```
So using the old calculation we were at 7.6.8 MB way under the default
cache limit (100 MB).
Using the correction (applying 1176 as increment to the size) we are at
1066.93 MB (~1GB) which is close to the real retained sum heap: 1064.47 MB
(~1GB) and there is no entry which was underestimated.
But we can go further and get rid of `java.io.File` completely and store
the `ShuffleIndexInformation` for the file path.
This way not only the cache size estimate is improved but the its size is
decreased as well.
Here the path size is not counted into the cache size as that string is
interned.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
With the calculations above.
Closes #35720 from attilapiros/SPARK-33206-3.1.
Authored-by: attilapiros <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/network/shuffle/ExecutorDiskUtils.java | 7 +--
.../shuffle/ExternalShuffleBlockResolver.java | 43 +++++++------
.../network/shuffle/RemoteBlockPushResolver.java | 38 ++++++------
.../network/shuffle/ShuffleIndexInformation.java | 23 ++++---
.../shuffle/ShuffleIndexInformationSuite.java | 71 ++++++++++++++++++++++
.../network/shuffle/TestShuffleDataContext.java | 16 ++---
.../spark/shuffle/IndexShuffleBlockResolver.scala | 6 +-
.../org/apache/spark/storage/BlockManager.scala | 2 +-
.../apache/spark/storage/DiskBlockManager.scala | 2 +-
.../apache/spark/storage/BlockManagerSuite.scala | 3 +-
10 files changed, 149 insertions(+), 62 deletions(-)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskUtils.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskUtils.java
index e5e61aa..2ed0718 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskUtils.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorDiskUtils.java
@@ -27,7 +27,7 @@ public class ExecutorDiskUtils {
* Hashes a filename into the corresponding local directory, in a manner
consistent with
* Spark's DiskBlockManager.getFile().
*/
- public static File getFile(String[] localDirs, int subDirsPerLocalDir,
String filename) {
+ public static String getFilePath(String[] localDirs, int subDirsPerLocalDir,
String filename) {
int hash = JavaUtils.nonNegativeHash(filename);
String localDir = localDirs[hash % localDirs.length];
int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
@@ -38,9 +38,8 @@ public class ExecutorDiskUtils {
// Unfortunately, we cannot just call the normalization code that
java.io.File
// uses, since it is in the package-private class java.io.FileSystem.
// So we are creating a File just to get the normalized path back to
intern it.
- // Finally a new File is built and returned with this interned normalized
path.
- final String normalizedInternedPath = new
File(notNormalizedPath).getPath().intern();
- return new File(normalizedInternedPath);
+ // We return this interned normalized path.
+ return new File(notNormalizedPath).getPath().intern();
}
}
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index a095bf2..1991d8a 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -78,7 +78,7 @@ public class ExternalShuffleBlockResolver {
* Caches index file information so that we can avoid open/close the index
files
* for each block fetch.
*/
- private final LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache;
+ private final LoadingCache<String, ShuffleIndexInformation>
shuffleIndexCache;
// Single-threaded Java executor used to perform expensive recursive
directory deletion.
private final Executor directoryCleaner;
@@ -110,17 +110,17 @@ public class ExternalShuffleBlockResolver {
Boolean.valueOf(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED,
"false"));
this.registeredExecutorFile = registeredExecutorFile;
String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size",
"100m");
- CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
- new CacheLoader<File, ShuffleIndexInformation>() {
- public ShuffleIndexInformation load(File file) throws IOException {
- return new ShuffleIndexInformation(file);
+ CacheLoader<String, ShuffleIndexInformation> indexCacheLoader =
+ new CacheLoader<String, ShuffleIndexInformation>() {
+ public ShuffleIndexInformation load(String filePath) throws
IOException {
+ return new ShuffleIndexInformation(filePath);
}
};
shuffleIndexCache = CacheBuilder.newBuilder()
.maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
- .weigher(new Weigher<File, ShuffleIndexInformation>() {
- public int weigh(File file, ShuffleIndexInformation indexInfo) {
- return indexInfo.getSize();
+ .weigher(new Weigher<String, ShuffleIndexInformation>() {
+ public int weigh(String filePath, ShuffleIndexInformation indexInfo) {
+ return indexInfo.getRetainedMemorySize();
}
})
.build(indexCacheLoader);
@@ -302,28 +302,35 @@ public class ExternalShuffleBlockResolver {
*/
private ManagedBuffer getSortBasedShuffleBlockData(
ExecutorShuffleInfo executor, int shuffleId, long mapId, int
startReduceId, int endReduceId) {
- File indexFile = ExecutorDiskUtils.getFile(executor.localDirs,
executor.subDirsPerLocalDir,
- "shuffle_" + shuffleId + "_" + mapId + "_0.index");
+ String indexFilePath =
+ ExecutorDiskUtils.getFilePath(
+ executor.localDirs,
+ executor.subDirsPerLocalDir,
+ "shuffle_" + shuffleId + "_" + mapId + "_0.index");
try {
- ShuffleIndexInformation shuffleIndexInformation =
shuffleIndexCache.get(indexFile);
+ ShuffleIndexInformation shuffleIndexInformation =
shuffleIndexCache.get(indexFilePath);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(
startReduceId, endReduceId);
return new FileSegmentManagedBuffer(
conf,
- ExecutorDiskUtils.getFile(executor.localDirs,
executor.subDirsPerLocalDir,
- "shuffle_" + shuffleId + "_" + mapId + "_0.data"),
+ new File(
+ ExecutorDiskUtils.getFilePath(
+ executor.localDirs,
+ executor.subDirsPerLocalDir,
+ "shuffle_" + shuffleId + "_" + mapId + "_0.data")),
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
} catch (ExecutionException e) {
- throw new RuntimeException("Failed to open file: " + indexFile, e);
+ throw new RuntimeException("Failed to open file: " + indexFilePath, e);
}
}
public ManagedBuffer getDiskPersistedRddBlockData(
ExecutorShuffleInfo executor, int rddId, int splitIndex) {
- File file = ExecutorDiskUtils.getFile(executor.localDirs,
executor.subDirsPerLocalDir,
- "rdd_" + rddId + "_" + splitIndex);
+ File file = new File(
+ ExecutorDiskUtils.getFilePath(
+ executor.localDirs, executor.subDirsPerLocalDir, "rdd_" + rddId + "_"
+ splitIndex));
long fileLength = file.length();
ManagedBuffer res = null;
if (file.exists()) {
@@ -350,8 +357,8 @@ public class ExternalShuffleBlockResolver {
}
int numRemovedBlocks = 0;
for (String blockId : blockIds) {
- File file =
- ExecutorDiskUtils.getFile(executor.localDirs,
executor.subDirsPerLocalDir, blockId);
+ File file = new File(
+ ExecutorDiskUtils.getFilePath(executor.localDirs,
executor.subDirsPerLocalDir, blockId));
if (file.delete()) {
numRemovedBlocks++;
} else {
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 9363efc5..4924c51 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -84,7 +84,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
private final ErrorHandler.BlockPushErrorHandler errorHandler;
@SuppressWarnings("UnstableApiUsage")
- private final LoadingCache<File, ShuffleIndexInformation> indexCache;
+ private final LoadingCache<String, ShuffleIndexInformation> indexCache;
@SuppressWarnings("UnstableApiUsage")
public RemoteBlockPushResolver(TransportConf conf) {
@@ -96,15 +96,16 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
this.minChunkSize = conf.minChunkSizeInMergedShuffleFile();
this.ioExceptionsThresholdDuringMerge =
conf.ioExceptionsThresholdDuringMerge();
- CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
- new CacheLoader<File, ShuffleIndexInformation>() {
- public ShuffleIndexInformation load(File file) throws IOException {
- return new ShuffleIndexInformation(file);
+ CacheLoader<String, ShuffleIndexInformation> indexCacheLoader =
+ new CacheLoader<String, ShuffleIndexInformation>() {
+ public ShuffleIndexInformation load(String filePath) throws
IOException {
+ return new ShuffleIndexInformation(filePath);
}
};
indexCache = CacheBuilder.newBuilder()
.maximumWeight(conf.mergedIndexCacheSize())
- .weigher((Weigher<File, ShuffleIndexInformation>) (file, indexInfo) ->
indexInfo.getSize())
+ .weigher((Weigher<String, ShuffleIndexInformation>)
+ (filePath, indexInfo) -> indexInfo.getRetainedMemorySize())
.build(indexCacheLoader);
this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
}
@@ -130,7 +131,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
// be the first time the merge manager receives a pushed block for a
given application
// shuffle partition, or after the merged shuffle file is finalized. We
handle these
// two cases accordingly by checking if the file already exists.
- File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
+ File indexFile = new File(getMergedShuffleIndexFilePath(appShuffleId,
reduceId));
File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId);
try {
if (dataFile.exists()) {
@@ -164,7 +165,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
@Override
public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int
reduceId) {
AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId);
- File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
+ File indexFile = new File(getMergedShuffleIndexFilePath(appShuffleId,
reduceId));
if (!indexFile.exists()) {
throw new RuntimeException(String.format(
"Merged shuffle index file %s not found", indexFile.getPath()));
@@ -193,17 +194,18 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
throw new RuntimeException(String.format("Merged shuffle data file %s
not found",
dataFile.getPath()));
}
- File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
+ String indexFilePath =
+ getMergedShuffleIndexFilePath(appShuffleId, reduceId);
try {
// If we get here, the merged shuffle file should have been properly
finalized. Thus we can
// use the file length to determine the size of the merged shuffle block.
- ShuffleIndexInformation shuffleIndexInformation =
indexCache.get(indexFile);
+ ShuffleIndexInformation shuffleIndexInformation =
indexCache.get(indexFilePath);
ShuffleIndexRecord shuffleIndexRecord =
shuffleIndexInformation.getIndex(chunkId);
return new FileSegmentManagedBuffer(
conf, dataFile, shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
} catch (ExecutionException e) {
throw new RuntimeException(String.format(
- "Failed to open merged shuffle index file %s", indexFile.getPath()),
e);
+ "Failed to open merged shuffle index file %s", indexFilePath), e);
}
}
@@ -211,29 +213,29 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
* The logic here is consistent with
* org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile
*/
- private File getFile(String appId, String filename) {
+ private String getFilePath(String appId, String filename) {
// TODO: [SPARK-33236] Change the message when this service is able to
handle NM restart
AppPathsInfo appPathsInfo =
Preconditions.checkNotNull(appsPathInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
- File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
+ String targetFile =
ExecutorDiskUtils.getFilePath(appPathsInfo.activeLocalDirs,
appPathsInfo.subDirsPerLocalDir, filename);
- logger.debug("Get merged file {}", targetFile.getAbsolutePath());
+ logger.debug("Get merged file {}", targetFile);
return targetFile;
}
private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int
reduceId) {
String fileName = String.format("%s.data", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, fileName);
+ return new File(getFilePath(appShuffleId.appId, fileName));
}
- private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int
reduceId) {
+ private String getMergedShuffleIndexFilePath(AppShuffleId appShuffleId, int
reduceId) {
String indexName = String.format("%s.index",
generateFileName(appShuffleId, reduceId));
- return getFile(appShuffleId.appId, indexName);
+ return getFilePath(appShuffleId.appId, indexName);
}
private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int
reduceId) {
String metaName = String.format("%s.meta", generateFileName(appShuffleId,
reduceId));
- return getFile(appShuffleId.appId, metaName);
+ return new File(getFilePath(appShuffleId.appId, metaName));
}
@Override
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
index b65aacf..6669255 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
@@ -29,25 +29,28 @@ import java.nio.file.Files;
* as an in-memory LongBuffer.
*/
public class ShuffleIndexInformation {
+
+ // The estimate of `ShuffleIndexInformation` memory footprint which is
relevant in case of small
+ // index files (i.e. storing only 2 offsets = 16 bytes).
+ static final int INSTANCE_MEMORY_FOOTPRINT = 176;
+
/** offsets as long buffer */
private final LongBuffer offsets;
- private int size;
- public ShuffleIndexInformation(File indexFile) throws IOException {
- size = (int)indexFile.length();
- ByteBuffer buffer = ByteBuffer.allocate(size);
+ public ShuffleIndexInformation(String indexFilePath) throws IOException {
+ File indexFile = new File(indexFilePath);
+ ByteBuffer buffer = ByteBuffer.allocate((int)indexFile.length());
offsets = buffer.asLongBuffer();
try (DataInputStream dis = new
DataInputStream(Files.newInputStream(indexFile.toPath()))) {
dis.readFully(buffer.array());
}
}
- /**
- * Size of the index file
- * @return size
- */
- public int getSize() {
- return size;
+ public int getRetainedMemorySize() {
+ // SPARK-33206: here the offsets' capacity is multiplied by 8 as offsets
stores long values.
+ // Integer overflow won't be an issue here as long as the number of
reducers is under
+ // (Integer.MAX_VALUE - INSTANCE_MEMORY_FOOTPRINT) / 8 - 1 = 268435432.
+ return (offsets.capacity() << 3) + INSTANCE_MEMORY_FOOTPRINT;
}
/**
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleIndexInformationSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleIndexInformationSuite.java
new file mode 100644
index 0000000..c4ff893
--- /dev/null
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleIndexInformationSuite.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.*;
+
+public class ShuffleIndexInformationSuite {
+ private static final String sortBlock0 = "tiny block";
+ private static final String sortBlock1 = "a bit longer block";
+
+ private static TestShuffleDataContext dataContext;
+ private static String blockId;
+
+ @BeforeClass
+ public static void before() throws IOException {
+ dataContext = new TestShuffleDataContext(2, 5);
+
+ dataContext.create();
+ // Write some sort data.
+ blockId = dataContext.insertSortShuffleData(0, 0, new byte[][] {
+ sortBlock0.getBytes(StandardCharsets.UTF_8),
+ sortBlock1.getBytes(StandardCharsets.UTF_8)});
+ }
+
+ @AfterClass
+ public static void afterAll() {
+ dataContext.cleanup();
+ }
+
+ @Test
+ public void test() throws IOException {
+ String path = ExecutorDiskUtils.getFilePath(
+ dataContext.localDirs,
+ dataContext.subDirsPerLocalDir,
+ blockId + ".index");
+ ShuffleIndexInformation s = new ShuffleIndexInformation(path);
+ // the index file contains 3 offsets:
+ // 0, sortBlock0.length, sortBlock0.length + sortBlock1.length
+ assertEquals(0L, s.getIndex(0).getOffset());
+ assertEquals(sortBlock0.length(), s.getIndex(0).getLength());
+
+ assertEquals(sortBlock0.length(), s.getIndex(1).getOffset());
+ assertEquals(sortBlock1.length(), s.getIndex(1).getLength());
+
+ assertEquals((3 * 8) + ShuffleIndexInformation.INSTANCE_MEMORY_FOOTPRINT,
+ s.getRetainedMemorySize());
+ }
+}
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index fb67d72..bcf57ea 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -68,7 +68,8 @@ public class TestShuffleDataContext {
}
/** Creates reducer blocks in a sort-based data format within our local
dirs. */
- public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks)
throws IOException {
+ public String insertSortShuffleData(int shuffleId, int mapId, byte[][]
blocks)
+ throws IOException {
String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0";
OutputStream dataStream = null;
@@ -76,10 +77,10 @@ public class TestShuffleDataContext {
boolean suppressExceptionsDuringClose = true;
try {
- dataStream = new FileOutputStream(
- ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId +
".data"));
- indexStream = new DataOutputStream(new FileOutputStream(
- ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId +
".index")));
+ dataStream = new FileOutputStream(new File(
+ ExecutorDiskUtils.getFilePath(localDirs, subDirsPerLocalDir, blockId +
".data")));
+ indexStream = new DataOutputStream(new FileOutputStream(new File(
+ ExecutorDiskUtils.getFilePath(localDirs, subDirsPerLocalDir, blockId +
".index"))));
long offset = 0;
indexStream.writeLong(offset);
@@ -93,6 +94,7 @@ public class TestShuffleDataContext {
Closeables.close(dataStream, suppressExceptionsDuringClose);
Closeables.close(indexStream, suppressExceptionsDuringClose);
}
+ return blockId;
}
/** Creates spill file(s) within the local dirs. */
@@ -122,11 +124,11 @@ public class TestShuffleDataContext {
private void insertFile(String filename, byte[] block) throws IOException {
OutputStream dataStream = null;
- File file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir,
filename);
+ File file = new File(ExecutorDiskUtils.getFilePath(localDirs,
subDirsPerLocalDir, filename));
Assert.assertFalse("this test file has been already generated",
file.exists());
try {
dataStream = new FileOutputStream(
- ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename));
+ new File(ExecutorDiskUtils.getFilePath(localDirs, subDirsPerLocalDir,
filename)));
dataStream.write(block);
} finally {
Closeables.close(dataStream, false);
diff --git
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 7112ef5..1a2f619 100644
---
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -81,7 +81,8 @@ private[spark] class IndexShuffleBlockResolver(
def getDataFile(shuffleId: Int, mapId: Long, dirs: Option[Array[String]]):
File = {
val blockId = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
dirs
- .map(ExecutorDiskUtils.getFile(_, blockManager.subDirsPerLocalDir,
blockId.name))
+ .map(d =>
+ new File(ExecutorDiskUtils.getFilePath(d,
blockManager.subDirsPerLocalDir, blockId.name)))
.getOrElse(blockManager.diskBlockManager.getFile(blockId))
}
@@ -97,7 +98,8 @@ private[spark] class IndexShuffleBlockResolver(
dirs: Option[Array[String]] = None): File = {
val blockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
dirs
- .map(ExecutorDiskUtils.getFile(_, blockManager.subDirsPerLocalDir,
blockId.name))
+ .map(d =>
+ new File(ExecutorDiskUtils.getFilePath(d,
blockManager.subDirsPerLocalDir, blockId.name)))
.getOrElse(blockManager.diskBlockManager.getFile(blockId))
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 4c09e16..3f3dc9c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1117,7 +1117,7 @@ private[spark] class BlockManager(
blockId: BlockId,
localDirs: Array[String],
blockSize: Long): Option[ManagedBuffer] = {
- val file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir,
blockId.name)
+ val file = new File(ExecutorDiskUtils.getFilePath(localDirs,
subDirsPerLocalDir, blockId.name))
if (file.exists()) {
val managedBuffer = securityManager.getIOEncryptionKey() match {
case Some(key) =>
diff --git
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 5db4965..9d92d28 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -56,7 +56,7 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
- // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile().
+ // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFilePath().
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory
in that
val hash = Utils.nonNegativeHash(filename)
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index cd319da..58a4f2f 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -849,7 +849,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers
with BeforeAndAfterE
val blockSize = inv.getArguments()(2).asInstanceOf[Long]
val res = store1.readDiskBlockFromSameHostExecutor(blockId, localDirs,
blockSize)
assert(res.isDefined)
- val file = ExecutorDiskUtils.getFile(localDirs,
store1.subDirsPerLocalDir, blockId.name)
+ val file = new File(
+ ExecutorDiskUtils.getFilePath(localDirs, store1.subDirsPerLocalDir,
blockId.name))
// delete the file behind the blockId
assert(file.delete())
sameHostExecutorTried = true
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]