Thesharing commented on a change in pull request #16498:
URL: https://github.com/apache/flink/pull/16498#discussion_r677984188
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -99,44 +101,56 @@ public BlobCacheSizeTracker(long sizeLimit) {
}
/** Register the target file to the tracker. */
- public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+ public void track(JobID jobId, BlobKey blobKey, long size) {
+ checkNotNull(jobId);
checkNotNull(blobKey);
checkArgument(size >= 0);
synchronized (lock) {
- caches.put(Tuple2.of(jobId, blobKey), size);
- if (jobId != null) {
+ if (caches.putIfAbsent(Tuple2.of(jobId, blobKey), size) == null) {
blobKeyByJob.computeIfAbsent(jobId, ignore -> new
HashSet<>()).add(blobKey);
- }
- total += size;
- if (total > sizeLimit) {
+
+ total += size;
+ if (total > sizeLimit) {
+ LOG.warn(
+ "The overall size of ShuffleDescriptors in
PermanentBlobCache exceeds "
+ + "the limit. Limit = [{}], Current: [{}],
"
+ + "The size of next ShuffleDescriptors:
[{}].",
+ sizeLimit,
+ total,
+ size);
+ }
+ } else {
LOG.warn(
Review comment:
I think this warning shouldn't happen frequently. In
`BlobUtil.moveTempFileToStore`, if the blob key duplicates, a warning will be
outputted, too. Furthermore, the previous warning ("The overall size of
ShuffleDescriptors in PermanentBlobCache exceeds ...") should be warning, too.
The overall size exceeds if and only if the size of blob exceeds 100MiB. In
this scenario the user should be warned about it.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
##########
@@ -50,13 +50,20 @@
/**
* Returns the content of the file for the BLOB with the provided job ID
the blob key.
*
+ * <p>Compared to {@code getFile}, {@code readFile} will attempt to read
the entire file after
+ * retrieving it. If file reading and file retrieving is done in the same
WRITE lock, it can
+ * avoid the scenario that the path to the file is deleted concurrently by
other threads at the
+ * same time as the file is retrieved and read.
Review comment:
Thank you for pointing this out. Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -99,44 +101,56 @@ public BlobCacheSizeTracker(long sizeLimit) {
}
/** Register the target file to the tracker. */
- public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+ public void track(JobID jobId, BlobKey blobKey, long size) {
+ checkNotNull(jobId);
checkNotNull(blobKey);
checkArgument(size >= 0);
synchronized (lock) {
- caches.put(Tuple2.of(jobId, blobKey), size);
- if (jobId != null) {
+ if (caches.putIfAbsent(Tuple2.of(jobId, blobKey), size) == null) {
blobKeyByJob.computeIfAbsent(jobId, ignore -> new
HashSet<>()).add(blobKey);
- }
- total += size;
- if (total > sizeLimit) {
+
+ total += size;
+ if (total > sizeLimit) {
+ LOG.warn(
+ "The overall size of ShuffleDescriptors in
PermanentBlobCache exceeds "
+ + "the limit. Limit = [{}], Current: [{}],
"
+ + "The size of next ShuffleDescriptors:
[{}].",
Review comment:
Resolved.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -99,44 +101,56 @@ public BlobCacheSizeTracker(long sizeLimit) {
}
/** Register the target file to the tracker. */
- public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+ public void track(JobID jobId, BlobKey blobKey, long size) {
+ checkNotNull(jobId);
checkNotNull(blobKey);
checkArgument(size >= 0);
synchronized (lock) {
- caches.put(Tuple2.of(jobId, blobKey), size);
- if (jobId != null) {
+ if (caches.putIfAbsent(Tuple2.of(jobId, blobKey), size) == null) {
blobKeyByJob.computeIfAbsent(jobId, ignore -> new
HashSet<>()).add(blobKey);
- }
- total += size;
- if (total > sizeLimit) {
+
+ total += size;
+ if (total > sizeLimit) {
+ LOG.warn(
+ "The overall size of ShuffleDescriptors in
PermanentBlobCache exceeds "
Review comment:
Thank you for pointing this out. Resolved.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]