This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 8165f42  STORM-3769 prevent topology blobs from being considered 
downloaded if any of their blobs was deleted (#3396)
8165f42 is described below

commit 8165f427729eccc0b018ed5b62610b7bf714cb3c
Author: agresch <[email protected]>
AuthorDate: Mon May 3 13:26:20 2021 -0500

    STORM-3769 prevent topology blobs from being considered downloaded if any 
of their blobs was deleted (#3396)
---
 storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java  |  6 ++++++
 .../main/java/org/apache/storm/localizer/AsyncLocalizer.java  | 11 ++++++++++-
 .../apache/storm/localizer/LocalizedResourceRetentionSet.java |  9 ++++++++-
 3 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java 
b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
index fe6d124..68e346d 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -337,6 +337,12 @@ public class ConfigUtils {
         return (topologyId + "-stormconf.ser");
     }
 
+    /**
+     * Returns the topology ID belonging to a blob key if it exists.
+     *
+     * @param key the blob key
+     * @return the topology id belonging to the key if it can be inferred.  
Returns null otherwise.
+     */
     public static String getIdFromBlobKey(String key) {
         if (key == null) {
             return null;
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java 
b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 32015ce..8e2c942 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -630,14 +630,23 @@ public class AsyncLocalizer implements AutoCloseable {
             }
 
             toClean.addResources(topologyBlobs);
+            Set<String> topologiesWithDeletes = new HashSet<>();
             try (ClientBlobStore store = getClientBlobStore()) {
-                toClean.cleanup(store);
+                Set<LocallyCachedBlob> deletedBlobs = toClean.cleanup(store);
+                for (LocallyCachedBlob deletedBlob : deletedBlobs) {
+                    String topologyId = 
ConfigUtils.getIdFromBlobKey(deletedBlob.getKey());
+                    if (topologyId != null) {
+                        topologiesWithDeletes.add(topologyId);
+                    }
+                }
             }
 
             HashSet<String> safeTopologyIds = new HashSet<>();
             for (String blobKey : topologyBlobs.keySet()) {
                 safeTopologyIds.add(ConfigUtils.getIdFromBlobKey(blobKey));
             }
+            LOG.debug("Topologies {} can no longer be considered fully 
downloaded", topologiesWithDeletes);
+            safeTopologyIds.removeAll(topologiesWithDeletes);
 
             //Deleting this early does not hurt anything
             topologyBasicDownloaded.keySet().removeIf(topoId -> 
!safeTopologyIds.contains(topoId));
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
index 3c31d4e..4d4360f 100644
--- 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
+++ 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
@@ -13,8 +13,10 @@
 package org.apache.storm.localizer;
 
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentMap;
@@ -77,8 +79,10 @@ public class LocalizedResourceRetentionSet {
      * Actually cleanup the blobs to try and get below the target cache size.
      * @param store the blobs store client used to check if the blob has been 
deleted from the blobstore.  If it has, the blob will be
      *     deleted even if the cache is not over the target size.
+     * @return a set containing any deleted blobs.
      */
-    public void cleanup(ClientBlobStore store) {
+    public Set<LocallyCachedBlob> cleanup(ClientBlobStore store) {
+        Set<LocallyCachedBlob> deleted = new HashSet<>();
         LOG.debug("cleanup target size: {} current size is: {}", targetSize, 
currentSize);
         long bytesOver = currentSize - targetSize;
         //First delete everything that no longer exists...
@@ -93,6 +97,7 @@ public class LocalizedResourceRetentionSet {
                     if (removeBlob(resource, set)) {
                         bytesOver -= resource.getSizeOnDisk();
                         LOG.info("Deleted blob: {} (REMOVED FROM CLUSTER).", 
resource.getKey());
+                        deleted.add(resource);
                         i.remove();
                     }
                 }
@@ -109,9 +114,11 @@ public class LocalizedResourceRetentionSet {
             if (removeBlob(resource, set)) {
                 bytesOver -= resource.getSizeOnDisk();
                 LOG.info("Deleted blob: {} (OVER SIZE LIMIT).", 
resource.getKey());
+                deleted.add(resource);
                 i.remove();
             }
         }
+        return deleted;
     }
 
     private boolean removeBlob(LocallyCachedBlob blob, Map<String, ? extends 
LocallyCachedBlob> blobs) {

Reply via email to