mridulm commented on code in PR #2194:
URL:
https://github.com/apache/incubator-celeborn/pull/2194#discussion_r1442141463
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -465,43 +501,54 @@ public DiskFileInfo resolve(
int startMapIndex,
int endMapIndex)
throws IOException {
+ String indexCacheName = shuffleKey + "-" + fileId;
Map<Integer, List<ShuffleBlockInfo>> indexMap;
- if (cachedIndexMaps.containsKey(shuffleKey)
- && cachedIndexMaps.get(shuffleKey).containsKey(fileId)) {
- indexMap = cachedIndexMaps.get(shuffleKey).get(fileId);
- } else {
- FileChannel indexChannel = null;
- FSDataInputStream hdfsIndexStream = null;
- boolean isHdfs = Utils.isHdfsPath(indexFilePath);
- int indexSize = 0;
- try {
- if (isHdfs) {
- hdfsIndexStream = StorageManager.hadoopFs().open(new
Path(indexFilePath));
- indexSize =
- (int) StorageManager.hadoopFs().getFileStatus(new
Path(indexFilePath)).getLen();
- } else {
- indexChannel =
FileChannelUtils.openReadableFileChannel(indexFilePath);
- File indexFile = new File(indexFilePath);
- indexSize = (int) indexFile.length();
- }
- ByteBuffer indexBuf = ByteBuffer.allocate(indexSize);
- if (isHdfs) {
- readStreamFully(hdfsIndexStream, indexBuf, indexFilePath);
- } else {
- readChannelFully(indexChannel, indexBuf, indexFilePath);
- }
- indexBuf.rewind();
- indexMap =
ShuffleBlockInfoUtils.parseShuffleBlockInfosFromByteBuffer(indexBuf);
- Map<String, Map<Integer, List<ShuffleBlockInfo>>> cacheMap =
- cachedIndexMaps.computeIfAbsent(shuffleKey, v ->
JavaUtils.newConcurrentHashMap());
- cacheMap.put(fileId, indexMap);
- } catch (Exception e) {
- logger.error("Read sorted shuffle file index " + indexFilePath + "
error, detail: ", e);
- throw new IOException("Read sorted shuffle file index failed.", e);
- } finally {
- IOUtils.closeQuietly(indexChannel, null);
- IOUtils.closeQuietly(hdfsIndexStream, null);
- }
+ try {
+ indexMap =
+ indexCache.get(
+ indexCacheName,
+ () -> {
+ FileChannel indexChannel = null;
+ FSDataInputStream hdfsIndexStream = null;
+ boolean isHdfs = Utils.isHdfsPath(indexFilePath);
+ int indexSize = 0;
+ try {
+ if (isHdfs) {
+ hdfsIndexStream = StorageManager.hadoopFs().open(new
Path(indexFilePath));
+ indexSize =
+ (int)
+ StorageManager.hadoopFs()
+ .getFileStatus(new Path(indexFilePath))
+ .getLen();
+ } else {
+ indexChannel =
FileChannelUtils.openReadableFileChannel(indexFilePath);
+ File indexFile = new File(indexFilePath);
+ indexSize = (int) indexFile.length();
+ }
+ ByteBuffer indexBuf = ByteBuffer.allocate(indexSize);
+ if (isHdfs) {
+ readStreamFully(hdfsIndexStream, indexBuf, indexFilePath);
+ } else {
+ readChannelFully(indexChannel, indexBuf, indexFilePath);
+ }
+ indexBuf.rewind();
+ Map<Integer, List<ShuffleBlockInfo>> tIndexMap =
+
ShuffleBlockInfoUtils.parseShuffleBlockInfosFromByteBuffer(indexBuf);
+ Set<String> indexCacheItemsSet =
+ indexCacheNames.computeIfAbsent(shuffleKey, v -> new
HashSet<>());
Review Comment:
QQ: this was not done right ? Trying to understand if this is still an issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]