waitinfuture commented on code in PR #2194:
URL:
https://github.com/apache/incubator-celeborn/pull/2194#discussion_r1440208807
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -465,43 +501,54 @@ public DiskFileInfo resolve(
int startMapIndex,
int endMapIndex)
throws IOException {
+ String indexCacheName = shuffleKey + "-" + fileId;
Review Comment:
`fileId` is already `shuffleKey - fileName`, no need to prepend `shuffleKey`
again
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -129,6 +131,35 @@ public PartitionFilesSorter(
ThreadUtils.newDaemonCachedThreadPool(
"worker-file-sorter-execute", conf.partitionSorterThreads(), 120);
+ indexCache =
+ CacheBuilder.newBuilder()
+ .concurrencyLevel(conf.partitionSorterThreads())
+ .expireAfterAccess(conf.partitionSorterIndexExpire(),
TimeUnit.MILLISECONDS)
+ .maximumWeight(indexCacheMaxWeight)
+ .weigher(
+ (key, cache) -> {
+ // estimated memory usage
+ int weight = 0;
Review Comment:
It's very hard to estimate real memory usage inside JVM, I doubt whether
this piece of code correctly calculates real memory usage. IMO @ErikFang 's
suggestion is simpler and better, using number of `ShuffleBlockInfo` is enough
and easy to tune the configuration.
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -465,43 +501,54 @@ public DiskFileInfo resolve(
int startMapIndex,
int endMapIndex)
throws IOException {
+ String indexCacheName = shuffleKey + "-" + fileId;
Map<Integer, List<ShuffleBlockInfo>> indexMap;
- if (cachedIndexMaps.containsKey(shuffleKey)
- && cachedIndexMaps.get(shuffleKey).containsKey(fileId)) {
- indexMap = cachedIndexMaps.get(shuffleKey).get(fileId);
- } else {
- FileChannel indexChannel = null;
- FSDataInputStream hdfsIndexStream = null;
- boolean isHdfs = Utils.isHdfsPath(indexFilePath);
- int indexSize = 0;
- try {
- if (isHdfs) {
- hdfsIndexStream = StorageManager.hadoopFs().open(new
Path(indexFilePath));
- indexSize =
- (int) StorageManager.hadoopFs().getFileStatus(new
Path(indexFilePath)).getLen();
- } else {
- indexChannel =
FileChannelUtils.openReadableFileChannel(indexFilePath);
- File indexFile = new File(indexFilePath);
- indexSize = (int) indexFile.length();
- }
- ByteBuffer indexBuf = ByteBuffer.allocate(indexSize);
- if (isHdfs) {
- readStreamFully(hdfsIndexStream, indexBuf, indexFilePath);
- } else {
- readChannelFully(indexChannel, indexBuf, indexFilePath);
- }
- indexBuf.rewind();
- indexMap =
ShuffleBlockInfoUtils.parseShuffleBlockInfosFromByteBuffer(indexBuf);
- Map<String, Map<Integer, List<ShuffleBlockInfo>>> cacheMap =
- cachedIndexMaps.computeIfAbsent(shuffleKey, v ->
JavaUtils.newConcurrentHashMap());
- cacheMap.put(fileId, indexMap);
- } catch (Exception e) {
- logger.error("Read sorted shuffle file index " + indexFilePath + "
error, detail: ", e);
- throw new IOException("Read sorted shuffle file index failed.", e);
- } finally {
- IOUtils.closeQuietly(indexChannel, null);
- IOUtils.closeQuietly(hdfsIndexStream, null);
- }
+ try {
+ indexMap =
+ indexCache.get(
+ indexCacheName,
+ () -> {
+ FileChannel indexChannel = null;
+ FSDataInputStream hdfsIndexStream = null;
+ boolean isHdfs = Utils.isHdfsPath(indexFilePath);
+ int indexSize = 0;
+ try {
+ if (isHdfs) {
+ hdfsIndexStream = StorageManager.hadoopFs().open(new
Path(indexFilePath));
+ indexSize =
+ (int)
+ StorageManager.hadoopFs()
+ .getFileStatus(new Path(indexFilePath))
+ .getLen();
+ } else {
+ indexChannel =
FileChannelUtils.openReadableFileChannel(indexFilePath);
+ File indexFile = new File(indexFilePath);
+ indexSize = (int) indexFile.length();
+ }
+ ByteBuffer indexBuf = ByteBuffer.allocate(indexSize);
+ if (isHdfs) {
+ readStreamFully(hdfsIndexStream, indexBuf, indexFilePath);
+ } else {
+ readChannelFully(indexChannel, indexBuf, indexFilePath);
+ }
+ indexBuf.rewind();
+ Map<Integer, List<ShuffleBlockInfo>> tIndexMap =
+
ShuffleBlockInfoUtils.parseShuffleBlockInfosFromByteBuffer(indexBuf);
+ Set<String> indexCacheItemsSet =
+ indexCacheNames.computeIfAbsent(shuffleKey, v -> new
HashSet<>());
Review Comment:
I think we need to make it a concurrent set because it will be accessed by
`cleanup` in other threads.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]