This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 8165f42 STORM-3769 prevent topology blobs from being considered
downloaded if any of their blobs was deleted (#3396)
8165f42 is described below
commit 8165f427729eccc0b018ed5b62610b7bf714cb3c
Author: agresch <[email protected]>
AuthorDate: Mon May 3 13:26:20 2021 -0500
STORM-3769 prevent topology blobs from being considered downloaded if any
of their blobs was deleted (#3396)
---
storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java | 6 ++++++
.../main/java/org/apache/storm/localizer/AsyncLocalizer.java | 11 ++++++++++-
.../apache/storm/localizer/LocalizedResourceRetentionSet.java | 9 ++++++++-
3 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
index fe6d124..68e346d 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -337,6 +337,12 @@ public class ConfigUtils {
return (topologyId + "-stormconf.ser");
}
+ /**
+ * Returns the topology ID belonging to a blob key if it exists.
+ *
+ * @param key the blob key
+ * @return the topology id belonging to the key if it can be inferred.
Returns null otherwise.
+ */
public static String getIdFromBlobKey(String key) {
if (key == null) {
return null;
diff --git
a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 32015ce..8e2c942 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -630,14 +630,23 @@ public class AsyncLocalizer implements AutoCloseable {
}
toClean.addResources(topologyBlobs);
+ Set<String> topologiesWithDeletes = new HashSet<>();
try (ClientBlobStore store = getClientBlobStore()) {
- toClean.cleanup(store);
+ Set<LocallyCachedBlob> deletedBlobs = toClean.cleanup(store);
+ for (LocallyCachedBlob deletedBlob : deletedBlobs) {
+ String topologyId =
ConfigUtils.getIdFromBlobKey(deletedBlob.getKey());
+ if (topologyId != null) {
+ topologiesWithDeletes.add(topologyId);
+ }
+ }
}
HashSet<String> safeTopologyIds = new HashSet<>();
for (String blobKey : topologyBlobs.keySet()) {
safeTopologyIds.add(ConfigUtils.getIdFromBlobKey(blobKey));
}
+ LOG.debug("Topologies {} can no longer be considered fully
downloaded", topologiesWithDeletes);
+ safeTopologyIds.removeAll(topologiesWithDeletes);
//Deleting this early does not hurt anything
topologyBasicDownloaded.keySet().removeIf(topoId ->
!safeTopologyIds.contains(topoId));
diff --git
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
index 3c31d4e..4d4360f 100644
---
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
+++
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
@@ -13,8 +13,10 @@
package org.apache.storm.localizer;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
@@ -77,8 +79,10 @@ public class LocalizedResourceRetentionSet {
* Actually cleanup the blobs to try and get below the target cache size.
* @param store the blobs store client used to check if the blob has been
deleted from the blobstore. If it has, the blob will be
* deleted even if the cache is not over the target size.
+ * @return a set containing any deleted blobs.
*/
- public void cleanup(ClientBlobStore store) {
+ public Set<LocallyCachedBlob> cleanup(ClientBlobStore store) {
+ Set<LocallyCachedBlob> deleted = new HashSet<>();
LOG.debug("cleanup target size: {} current size is: {}", targetSize,
currentSize);
long bytesOver = currentSize - targetSize;
//First delete everything that no longer exists...
@@ -93,6 +97,7 @@ public class LocalizedResourceRetentionSet {
if (removeBlob(resource, set)) {
bytesOver -= resource.getSizeOnDisk();
LOG.info("Deleted blob: {} (REMOVED FROM CLUSTER).",
resource.getKey());
+ deleted.add(resource);
i.remove();
}
}
@@ -109,9 +114,11 @@ public class LocalizedResourceRetentionSet {
if (removeBlob(resource, set)) {
bytesOver -= resource.getSizeOnDisk();
LOG.info("Deleted blob: {} (OVER SIZE LIMIT).",
resource.getKey());
+ deleted.add(resource);
i.remove();
}
}
+ return deleted;
}
private boolean removeBlob(LocallyCachedBlob blob, Map<String, ? extends
LocallyCachedBlob> blobs) {