zhuzhurk commented on a change in pull request #16498:
URL: https://github.com/apache/flink/pull/16498#discussion_r677943607



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -99,44 +101,56 @@ public BlobCacheSizeTracker(long sizeLimit) {
     }
 
     /** Register the target file to the tracker. */
-    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+    public void track(JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(jobId);
         checkNotNull(blobKey);
         checkArgument(size >= 0);
 
         synchronized (lock) {
-            caches.put(Tuple2.of(jobId, blobKey), size);
-            if (jobId != null) {
+            if (caches.putIfAbsent(Tuple2.of(jobId, blobKey), size) == null) {
                 blobKeyByJob.computeIfAbsent(jobId, ignore -> new 
HashSet<>()).add(blobKey);
-            }
-            total += size;
-            if (total > sizeLimit) {
+
+                total += size;
+                if (total > sizeLimit) {
+                    LOG.warn(
+                            "The overall size of ShuffleDescriptors in 
PermanentBlobCache exceeds "

Review comment:
       `ShuffleDescriptors` -> `blob`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -99,44 +101,56 @@ public BlobCacheSizeTracker(long sizeLimit) {
     }
 
     /** Register the target file to the tracker. */
-    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+    public void track(JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(jobId);
         checkNotNull(blobKey);
         checkArgument(size >= 0);
 
         synchronized (lock) {
-            caches.put(Tuple2.of(jobId, blobKey), size);
-            if (jobId != null) {
+            if (caches.putIfAbsent(Tuple2.of(jobId, blobKey), size) == null) {
                 blobKeyByJob.computeIfAbsent(jobId, ignore -> new 
HashSet<>()).add(blobKey);
-            }
-            total += size;
-            if (total > sizeLimit) {
+
+                total += size;
+                if (total > sizeLimit) {
+                    LOG.warn(
+                            "The overall size of ShuffleDescriptors in 
PermanentBlobCache exceeds "
+                                    + "the limit. Limit = [{}], Current: [{}], 
"
+                                    + "The size of next ShuffleDescriptors: 
[{}].",

Review comment:
       `ShuffleDescriptors` -> `blob`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
##########
@@ -99,44 +101,56 @@ public BlobCacheSizeTracker(long sizeLimit) {
     }
 
     /** Register the target file to the tracker. */
-    public void track(@Nullable JobID jobId, BlobKey blobKey, long size) {
+    public void track(JobID jobId, BlobKey blobKey, long size) {
+        checkNotNull(jobId);
         checkNotNull(blobKey);
         checkArgument(size >= 0);
 
         synchronized (lock) {
-            caches.put(Tuple2.of(jobId, blobKey), size);
-            if (jobId != null) {
+            if (caches.putIfAbsent(Tuple2.of(jobId, blobKey), size) == null) {
                 blobKeyByJob.computeIfAbsent(jobId, ignore -> new 
HashSet<>()).add(blobKey);
-            }
-            total += size;
-            if (total > sizeLimit) {
+
+                total += size;
+                if (total > sizeLimit) {
+                    LOG.warn(
+                            "The overall size of ShuffleDescriptors in 
PermanentBlobCache exceeds "
+                                    + "the limit. Limit = [{}], Current: [{}], 
"
+                                    + "The size of next ShuffleDescriptors: 
[{}].",
+                            sizeLimit,
+                            total,
+                            size);
+                }
+            } else {
                 LOG.warn(

Review comment:
       Is it possible to happen frequently. If yes, I prefer to make it a debug 
log.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to