http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java
index f38f6f6..862349b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java
@@ -15,30 +15,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.localizer;
 
 /**
- * Local Resource requested by the topology
+ * Local Resource requested by the topology.
  */
 public class LocalResource {
-  private String _blobKey;
-  private boolean _uncompress;
+    private final boolean needsCallback;
+    private final String blobKey;
+    private final boolean uncompress;
+
+    /**
+     * Constructor.
+     * @param keyname the key of the blob to download.
+     * @param uncompress should the blob be uncompressed or not.
+     * @param needsCallback if the blobs changes should a callback happen so 
the worker is restarted.
+     */
+    public LocalResource(String keyname, boolean uncompress, boolean 
needsCallback) {
+        blobKey = keyname;
+        this.uncompress = uncompress;
+        this.needsCallback = needsCallback;
+    }
 
-  public LocalResource(String keyname, boolean uncompress) {
-    _blobKey = keyname;
-    _uncompress = uncompress;
-  }
+    public String getBlobName() {
+        return blobKey;
+    }
 
-  public String getBlobName() {
-    return _blobKey;
-  }
+    public boolean shouldUncompress() {
+        return uncompress;
+    }
 
-  public boolean shouldUncompress() {
-    return _uncompress;
-  }
+    public boolean needsCallback() {
+        return needsCallback;
+    }
 
-  @Override
-  public String toString() {
-    return "Key: " + _blobKey + " uncompress: " + _uncompress;
-  }
+    @Override
+    public String toString() {
+        return "Key: " + blobKey + " uncompress: " + uncompress;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
index 7241976..4f01c30 100644
--- 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
+++ 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
@@ -15,129 +15,434 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.localizer;
 
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ServerUtils;
+import org.apache.storm.utils.ShellUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * Represents a resource that is localized on the supervisor.
  * A localized resource has a .current symlink to the current version file 
which is named
  * filename.{current version}. There is also a filename.version which contains 
the latest version.
  */
-public class LocalizedResource {
-  public static final Logger LOG = 
LoggerFactory.getLogger(LocalizedResource.class);
-
-  // filesystem path to the resource
-  private final String _localPath;
-  private final String _versionFilePath;
-  private final String _symlinkPath;
-  private final String _key;
-  private final boolean _uncompressed;
-  // _size of the resource
-  private long _size = -1;
-  // queue of topologies referencing resource
-  private final Set<String> _ref;
-  // last access time of the resource -> increment when topology finishes 
using it
-  private final AtomicLong _lastAccessTime = new AtomicLong(currentTime());
-
-  public LocalizedResource(String key, String fileLoc, boolean uncompressed) {
-    _ref = new HashSet<String>();
-    _localPath = fileLoc;
-    _versionFilePath = ServerUtils.constructVersionFileName(fileLoc);
-    _symlinkPath = ServerUtils.constructBlobCurrentSymlinkName(fileLoc);
-    _uncompressed = uncompressed;
-    _key = key;
-    // we trust that the file exists
-    _size = ServerUtils.getDU(new File(getFilePathWithVersion()));
-    LOG.debug("size of {} is: {}", fileLoc, _size);
-  }
-
-  // create local resource and add reference
-  public LocalizedResource(String key, String fileLoc, boolean uncompressed, 
String topo) {
-    this(key, fileLoc, uncompressed);
-    _ref.add(topo);
-  }
-
-  public boolean isUncompressed() {
-    return _uncompressed;
-  }
-
-  public String getKey() {
-    return _key;
-  }
-
-  public String getCurrentSymlinkPath() {
-    return _symlinkPath;
-  }
-
-  public String getVersionFilePath() {
-    return _versionFilePath;
-  }
-
-  public String getFilePathWithVersion() {
-    long version = ServerUtils.localVersionOfBlob(_localPath);
-    return ServerUtils.constructBlobWithVersionFileName(_localPath, version);
-  }
-
-  public String getFilePath() {
-    return _localPath;
-  }
-
-  public void addReference(String topo) {
-    _ref.add(topo);
-  }
-
-  public void removeReference(String topo) {
-    if (!_ref.remove(topo)) {
-      LOG.warn("Tried to remove a reference to a topology that doesn't use 
this resource");
-    }
-    setTimestamp();
-  }
-
-  // The last access time is only valid if the resource doesn't have any 
references.
-  public long getLastAccessTime() {
-    return _lastAccessTime.get();
-  }
-
-  // for testing
-  protected void setSize(long size) {
-    _size = size;
-  }
-
-  public long getSize() {
-    return _size;
-  }
-
-  private void setTimestamp() {
-    _lastAccessTime.set(currentTime());
-  }
-
-  public int getRefCount() {
-    return _ref.size();
-  }
-
-  private long currentTime() {
-    return System.nanoTime();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other instanceof LocalizedResource) {
-        LocalizedResource l = (LocalizedResource)other;
-        return _key.equals(l._key) && _uncompressed == l._uncompressed && 
_localPath.equals(l._localPath);
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-     return _key.hashCode() + Boolean.hashCode(_uncompressed) + 
_localPath.hashCode();
-  }
+public class LocalizedResource extends LocallyCachedBlob {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LocalizedResource.class);
+    @VisibleForTesting
+    static final String CURRENT_BLOB_SUFFIX = ".current";
+    @VisibleForTesting
+    static final String BLOB_VERSION_SUFFIX = ".version";
+    @VisibleForTesting
+    static final String FILECACHE = "filecache";
+    @VisibleForTesting
+    static final String USERCACHE = "usercache";
+    // sub directories to store either files or uncompressed archives 
respectively
+    @VisibleForTesting
+    static final String FILESDIR = "files";
+    @VisibleForTesting
+    static final String ARCHIVESDIR = "archives";
+    private static final String TO_UNCOMPRESS = "_tmp_";
+
+    private static Path constructVersionFileName(Path baseDir, String key) {
+        return baseDir.resolve(key + BLOB_VERSION_SUFFIX);
+    }
+
+    @VisibleForTesting
+    static long localVersionOfBlob(Path versionFile) {
+        long currentVersion = -1;
+        if (Files.exists(versionFile) && !(Files.isDirectory(versionFile))) {
+            try (BufferedReader br = new BufferedReader(new 
FileReader(versionFile.toFile()))) {
+                String line = br.readLine();
+                currentVersion = Long.parseLong(line);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return currentVersion;
+    }
+
+    private static Path constructBlobCurrentSymlinkName(Path baseDir, String 
key) {
+        return baseDir.resolve(key + CURRENT_BLOB_SUFFIX);
+    }
+
+    private static Path constructBlobWithVersionFileName(Path baseDir, String 
key, long version) {
+        return baseDir.resolve(key + "." + version);
+    }
+
+    static Collection<String> getLocalizedUsers(Path localBaseDir) throws 
IOException {
+        Path userCacheDir = getUserCacheDir(localBaseDir);
+        if (!Files.exists(userCacheDir)) {
+            return Collections.emptyList();
+        }
+        return Files.list(userCacheDir).map((p) -> 
p.getFileName().toString()).collect(Collectors.toList());
+    }
+
+    static void completelyRemoveUnusedUser(Path localBaseDir, String user) 
throws IOException {
+        Path userFileCacheDir = getLocalUserFileCacheDir(localBaseDir, user);
+        // baseDir/supervisor/usercache/user1/filecache/files
+        Files.deleteIfExists(getCacheDirForFiles(userFileCacheDir));
+        // baseDir/supervisor/usercache/user1/filecache/archives
+        Files.deleteIfExists(getCacheDirForArchives(userFileCacheDir));
+        // baseDir/supervisor/usercache/user1/filecache
+        Files.deleteIfExists(userFileCacheDir);
+        // baseDir/supervisor/usercache/user1
+        Files.deleteIfExists(getLocalUserDir(localBaseDir, user));
+    }
+
+    static List<String> getLocalizedArchiveKeys(Path localBaseDir, String 
user) throws IOException {
+        Path dir = 
getCacheDirForArchives(getLocalUserFileCacheDir(localBaseDir, user));
+        return readKeysFromDir(dir);
+    }
+
+    static List<String> getLocalizedFileKeys(Path localBaseDir, String user) 
throws IOException {
+        Path dir = getCacheDirForFiles(getLocalUserFileCacheDir(localBaseDir, 
user));
+        return readKeysFromDir(dir);
+    }
+
+    // Looks for files in the directory with .current suffix
+    private static List<String> readKeysFromDir(Path dir) throws IOException {
+        if (!Files.exists(dir)) {
+            return Collections.emptyList();
+        }
+        return Files.list(dir)
+            .map((p) -> p.getFileName().toString())
+            .filter((name) -> name.toLowerCase().endsWith(CURRENT_BLOB_SUFFIX))
+            .map((key) -> {
+                int p = key.lastIndexOf('.');
+                if (p > 0) {
+                    key = key.substring(0, p);
+                }
+                return key;
+            })
+            .collect(Collectors.toList());
+    }
+
+    // baseDir/supervisor/usercache/
+    private static Path getUserCacheDir(Path localBaseDir) {
+        return localBaseDir.resolve(USERCACHE);
+    }
+
+    // baseDir/supervisor/usercache/user1/
+    static Path getLocalUserDir(Path localBaseDir, String userName) {
+        return getUserCacheDir(localBaseDir).resolve(userName);
+    }
+
+    // baseDir/supervisor/usercache/user1/filecache
+    static Path getLocalUserFileCacheDir(Path localBaseDir, String userName) {
+        return getLocalUserDir(localBaseDir, userName).resolve(FILECACHE);
+    }
+
+    // baseDir/supervisor/usercache/user1/filecache/files
+    private static Path getCacheDirForFiles(Path dir) {
+        return dir.resolve(FILESDIR);
+    }
+
+    // get the directory to put uncompressed archives in
+    // baseDir/supervisor/usercache/user1/filecache/archives
+    private static Path getCacheDirForArchives(Path dir) {
+        return dir.resolve(ARCHIVESDIR);
+    }
+
+    // filesystem path to the resource
+    private final Path baseDir;
+    private final Path versionFilePath;
+    private final Path symlinkPath;
+    private final boolean uncompressed;
+    private final IAdvancedFSOps fsOps;
+    private final String user;
+    // size of the resource
+    private long size = -1;
+    private final Map<String, Object> conf;
+
+    LocalizedResource(String key, Path localBaseDir, boolean uncompressed, 
IAdvancedFSOps fsOps, Map<String, Object> conf,
+                             String user) {
+        super(key + (uncompressed ? " archive" : " file"), key);
+        Path base = getLocalUserFileCacheDir(localBaseDir, user);
+        this.baseDir = uncompressed ? getCacheDirForArchives(base) : 
getCacheDirForFiles(base);
+        this.conf = conf;
+        this.user = user;
+        this.fsOps = fsOps;
+        versionFilePath = constructVersionFileName(baseDir, key);
+        symlinkPath = constructBlobCurrentSymlinkName(baseDir, key);
+        this.uncompressed = uncompressed;
+        //Set the size in case we are recovering an already downloaded object
+        setSize();
+    }
+
+    Path getCurrentSymlinkPath() {
+        return symlinkPath;
+    }
+
+    @VisibleForTesting
+    Path getFilePathWithVersion() {
+        return constructBlobWithVersionFileName(baseDir, getKey(), 
getLocalVersion());
+    }
+
+    private void setSize() {
+        // we trust that the file exists
+        Path withVersion = getFilePathWithVersion();
+        size = ServerUtils.getDU(withVersion.toFile());
+        LOG.debug("size of {} is: {}", withVersion, size);
+    }
+
+    @VisibleForTesting
+    protected void setSize(long size) {
+        this.size = size;
+    }
+
+    @Override
+    public long getLocalVersion() {
+        return localVersionOfBlob(versionFilePath);
+    }
+
+    @Override
+    public long getRemoteVersion(ClientBlobStore store) throws 
KeyNotFoundException, AuthorizationException {
+        return ServerUtils.nimbusVersionOfBlob(getKey(), store);
+    }
+
+    @Override
+    public long downloadToTempLocation(ClientBlobStore store) throws 
IOException, KeyNotFoundException, AuthorizationException {
+        String key = getKey();
+        ReadableBlobMeta meta = store.getBlobMeta(key);
+        if (!ServerUtils.canUserReadBlob(meta, user, conf)) {
+            throw new AuthorizationException(user + " does not have READ 
access to " + key);
+        }
+        long version;
+        Path downloadFile;
+        Path finalLocation;
+        try (InputStreamWithMeta in = store.getBlob(key)) {
+            version = in.getVersion();
+            finalLocation = constructBlobWithVersionFileName(baseDir, 
getKey(), version);
+            if (uncompressed) {
+                // we need to download to temp file and then unpack into the 
one requested
+                downloadFile = tmpOutputLocation();
+            } else {
+                downloadFile = finalLocation;
+            }
+            byte[] buffer = new byte[1024];
+            int len;
+            LOG.debug("Downloading {} to {}", key, downloadFile);
+            Path parent = downloadFile.getParent();
+            if (!Files.exists(parent)) {
+                Files.createDirectory(parent);
+            }
+            try (FileOutputStream out = new 
FileOutputStream(downloadFile.toFile())) {
+                while ((len = in.read(buffer)) >= 0) {
+                    out.write(buffer, 0, len);
+                }
+            }
+        }
+        if (uncompressed) {
+            ServerUtils.unpack(downloadFile.toFile(), finalLocation.toFile());
+            LOG.debug("Uncompressed {} to: {}", downloadFile, finalLocation);
+        }
+        setBlobPermissions(conf, user, finalLocation);
+        return version;
+    }
+
+    @Override
+    public void commitNewVersion(long version) throws IOException {
+        String key = getKey();
+        LOG.info("Blob: {} updated to version {} from version {}", key, 
version, getLocalVersion());
+        Path localVersionFile = versionFilePath;
+        // The false parameter ensures overwriting the version file, not 
appending
+        try (PrintWriter writer = new PrintWriter(
+            new BufferedWriter(new FileWriter(localVersionFile.toFile(), 
false)))) {
+            writer.println(version);
+        }
+        setBlobPermissions(conf, user, localVersionFile);
+
+        // Update the key.current symlink. First create tmp symlink and do
+        // move of tmp to current so that the operation is atomic.
+        Path tmpSymlink = tmpSymlinkLocation();
+        Path targetOfSymlink = constructBlobWithVersionFileName(baseDir, 
getKey(), version);
+        LOG.debug("Creating a symlink @{} linking to: {}", tmpSymlink, 
targetOfSymlink);
+        Files.createSymbolicLink(tmpSymlink, targetOfSymlink);
+
+        Path currentSymLink = getCurrentSymlinkPath();
+        Files.move(tmpSymlink, currentSymLink, ATOMIC_MOVE);
+        //Update the size of the objects
+        setSize();
+    }
+
+    private void setBlobPermissions(Map<String, Object> conf, String user, 
Path path)
+        throws IOException {
+
+        if 
(!ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), 
false)) {
+            return;
+        }
+        String wlCommand = 
ObjectReader.getString(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER), "");
+        if (wlCommand.isEmpty()) {
+            String stormHome = System.getProperty("storm.home");
+            wlCommand = stormHome + "/bin/worker-launcher";
+        }
+        List<String> command = new ArrayList<>(Arrays.asList(wlCommand, user, 
"blob", path.toString()));
+
+        String[] commandArray = command.toArray(new String[command.size()]);
+        ShellUtils.ShellCommandExecutor shExec = new 
ShellUtils.ShellCommandExecutor(commandArray);
+        LOG.debug("Setting blob permissions, command: {}", 
Arrays.toString(commandArray));
+
+        try {
+            shExec.execute();
+            LOG.debug("output: {}", shExec.getOutput());
+        } catch (ShellUtils.ExitCodeException e) {
+            int exitCode = shExec.getExitCode();
+            LOG.warn("Exit code from worker-launcher is: {}", exitCode, e);
+            LOG.debug("output: {}", shExec.getOutput());
+            throw new IOException("Setting blob permissions failed"
+                + " (exitCode=" + exitCode + ") with output: " + 
shExec.getOutput(), e);
+        }
+    }
+
+    private Path tmpOutputLocation() {
+        return baseDir.resolve(Paths.get(LocalizedResource.TO_UNCOMPRESS + 
getKey()));
+    }
+
+    private Path tmpSymlinkLocation() {
+        return baseDir.resolve(Paths.get(LocalizedResource.TO_UNCOMPRESS + 
getKey() + CURRENT_BLOB_SUFFIX));
+    }
+
+    private static final Pattern VERSION_FILE_PATTERN = 
Pattern.compile("^(.+)\\.(\\d+)$");
+
+    @Override
+    public void cleanupOrphanedData() throws IOException {
+        //There are a few possible files that we would want to clean up
+        //baseDir + "/" + "_tmp_" + baseName
+        //baseDir + "/" + "_tmp_" + baseName + ".current"
+        //baseDir + "/" + baseName.<VERSION>
+        //baseDir + "/" + baseName.current
+        //baseDir + "/" + baseName.version
+        //In general we always want to delete the _tmp_ files if they are 
there.
+
+        Path tmpOutput = tmpOutputLocation();
+        Files.deleteIfExists(tmpOutput);
+        Path tmpSym = tmpSymlinkLocation();
+        Files.deleteIfExists(tmpSym);
+
+        try {
+            String baseName = getKey();
+            long version = getLocalVersion();
+            Path current = getCurrentSymlinkPath();
+
+            //If .current and .version do not match, we roll back the .version 
file to match
+            // what .current is pointing to.
+            if (Files.exists(current) && Files.isSymbolicLink(current)) {
+                Path versionFile = Files.readSymbolicLink(current);
+                Matcher m = 
VERSION_FILE_PATTERN.matcher(versionFile.getFileName().toString());
+                if (m.matches()) {
+                    long foundVersion = Long.valueOf(m.group(2));
+                    if (foundVersion != version) {
+                        LOG.error("{} does not match the version file so fix 
the version file", current);
+                        //The versions are different so roll back to whatever 
current is
+                        try (PrintWriter restoreWriter = new PrintWriter(
+                            new BufferedWriter(new 
FileWriter(versionFilePath.toFile(), false)))) {
+                            restoreWriter.println(foundVersion);
+                        }
+                        version = foundVersion;
+                    }
+                }
+            }
+
+            // Finally delete any baseName.<VERSION> files that are not 
pointed to by the current version
+            final long finalVersion = version;
+            LOG.debug("Looking to clean up after {} in {}", getKey(), baseDir);
+            try (DirectoryStream<Path> ds = fsOps.newDirectoryStream(baseDir, 
(path) -> {
+                Matcher m = 
VERSION_FILE_PATTERN.matcher(path.getFileName().toString());
+                if (m.matches()) {
+                    long foundVersion = Long.valueOf(m.group(2));
+                    return m.group(1).equals(baseName) && foundVersion != 
finalVersion;
+                }
+                return false;
+            })) {
+                for (Path p : ds) {
+                    LOG.info("Cleaning up old localized resource file {}", p);
+                    if (Files.isDirectory(p)) {
+                        FileUtils.deleteDirectory(p.toFile());
+                    } else {
+                        fsOps.deleteIfExists(p.toFile());
+                    }
+                }
+            }
+        } catch (NoSuchFileException e) {
+            LOG.warn("Nothing to cleanup with badeDir {} even though we 
expected there to be something there", baseDir);
+        }
+    }
+
+    @Override
+    public void completelyRemove() throws IOException {
+        Path fileWithVersion = getFilePathWithVersion();
+        Path currentSymLink = getCurrentSymlinkPath();
+
+        if (uncompressed) {
+            if (Files.exists(fileWithVersion)) {
+                // this doesn't follow symlinks, which is what we want
+                FileUtils.deleteDirectory(fileWithVersion.toFile());
+            }
+        } else {
+            Files.deleteIfExists(fileWithVersion);
+        }
+        Files.deleteIfExists(currentSymLink);
+        Files.deleteIfExists(versionFilePath);
+    }
+
+    @Override
+    public long getSizeOnDisk() {
+        return size;
+    }
+
+    @Override
+    public boolean isFullyDownloaded() {
+        return Files.exists(getFilePathWithVersion())
+            && Files.exists(getCurrentSymlinkPath())
+            && Files.exists(versionFilePath);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other instanceof LocalizedResource) {
+            LocalizedResource l = (LocalizedResource)other;
+            return getKey().equals(l.getKey()) && uncompressed == 
l.uncompressed && baseDir.equals(l.baseDir);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return getKey().hashCode() + Boolean.hashCode(uncompressed) + 
baseDir.hashCode();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
index 826bf98..936dbc1 100644
--- 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
+++ 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
@@ -15,23 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.localizer;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.nio.file.Path;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A set of resources that we can look at to see which ones we retain and 
which ones should be
@@ -39,24 +37,22 @@ import java.util.TreeMap;
  */
 public class LocalizedResourceRetentionSet {
     public static final Logger LOG = 
LoggerFactory.getLogger(LocalizedResourceRetentionSet.class);
-    private long delSize;
     private long currentSize;
     // targetSize in Bytes
     private long targetSize;
     @VisibleForTesting
-    final SortedMap<ComparableResource, CleanableResourceSet> noReferences;
-    private int resourceCount = 0;
+    final SortedMap<LocallyCachedBlob, Map<String, ? extends 
LocallyCachedBlob>> noReferences;
 
     LocalizedResourceRetentionSet(long targetSize) {
         this(targetSize, new LRUComparator());
     }
 
-    LocalizedResourceRetentionSet(long targetSize, Comparator<? super 
ComparableResource> cmp) {
+    LocalizedResourceRetentionSet(long targetSize, Comparator<? super 
LocallyCachedBlob> cmp) {
         this(targetSize, new TreeMap<>(cmp));
     }
 
     LocalizedResourceRetentionSet(long targetSize,
-                                  SortedMap<ComparableResource, 
CleanableResourceSet> retain) {
+                                  SortedMap<LocallyCachedBlob, Map<String, ? 
extends LocallyCachedBlob>> retain) {
         this.noReferences = retain;
         this.targetSize = targetSize;
     }
@@ -66,287 +62,88 @@ public class LocalizedResourceRetentionSet {
         return noReferences.size();
     }
 
-    protected void addResourcesForSet(Iterator<LocalizedResource> setIter, 
LocalizedResourceSet set) {
-        CleanableLocalizedResourceSet cleanset = new 
CleanableLocalizedResourceSet(set);
-        for (Iterator<LocalizedResource> iter = setIter; setIter.hasNext(); ) {
-            LocalizedResource lrsrc = iter.next();
-            currentSize += lrsrc.getSize();
-            resourceCount ++;
-            if (lrsrc.getRefCount() > 0) {
-                // always retain resources in use
-                continue;
-            }
-            noReferences.put(new LocalizedBlobComparableResource(lrsrc), 
cleanset);
-        }
-    }
-
-    public void addResources(LocalizedResourceSet set) {
-        addResourcesForSet(set.getLocalFilesIterator(), set);
-        addResourcesForSet(set.getLocalArchivesIterator(), set);
-    }
-
-    public void addResources(ConcurrentHashMap<String, LocallyCachedBlob> 
blobs) {
-        CleanableLocalizedLocallyCachedBlob set = new 
CleanableLocalizedLocallyCachedBlob(blobs);
+    /**
+     * Add blobs to be checked if they can be deleted.
+     * @param blobs a map of blob name to the blob object.  The blobs in this 
map will be deleted from the map
+     *     if they are deleted on disk too.
+     */
+    public void addResources(ConcurrentMap<String, ? extends 
LocallyCachedBlob> blobs) {
         for (LocallyCachedBlob b: blobs.values()) {
             currentSize += b.getSizeOnDisk();
-            resourceCount ++;
             if (b.isUsed()) {
+                LOG.debug("NOT going to clean up {}, {} depends on it", 
b.getKey(), b.getDependencies());
                 // always retain resources in use
                 continue;
             }
-            LocallyCachedBlobComparableResource cb = new 
LocallyCachedBlobComparableResource(b);
-            noReferences.put(cb, set);
+            LOG.debug("Possibly going to clean up {} ts {} size {}", 
b.getKey(), b.getLastUsed(), b.getSizeOnDisk());
+            noReferences.put(b, blobs);
         }
     }
 
-    public void cleanup() {
+    /**
+     * Actually cleanup the blobs to try and get below the target cache size.
+     * @param store the blobs store client used to check if the blob has been 
deleted from the blobstore.  If it has, the blob will be
+     *     deleted even if the cache is not over the target size.
+     */
+    public void cleanup(ClientBlobStore store) {
         LOG.debug("cleanup target size: {} current size is: {}", targetSize, 
currentSize);
-        for (Iterator<Map.Entry<ComparableResource, CleanableResourceSet>> i =
-             noReferences.entrySet().iterator();
-             currentSize - delSize > targetSize && i.hasNext();) {
-            Map.Entry<ComparableResource, CleanableResourceSet> rsrc = 
i.next();
-            ComparableResource resource = rsrc.getKey();
-            CleanableResourceSet set = rsrc.getValue();
-            if (resource != null && set.remove(resource)) {
-                if (set.deleteUnderlyingResource(resource)) {
-                    delSize += resource.getSize();
-                    LOG.info("deleting: {} with size of: {}", 
resource.getNameForDebug(), resource.getSize());
+        long bytesOver = currentSize - targetSize;
+        //First delete everything that no longer exists...
+        for (Iterator<Map.Entry<LocallyCachedBlob, Map<String, ? extends 
LocallyCachedBlob>>> i = noReferences.entrySet().iterator();
+             i.hasNext();) {
+            Map.Entry<LocallyCachedBlob, Map<String, ? extends 
LocallyCachedBlob>> rsrc = i.next();
+            LocallyCachedBlob resource = rsrc.getKey();
+            try {
+                resource.getRemoteVersion(store);
+            } catch (AuthorizationException e) {
+                //Ignored
+            } catch (KeyNotFoundException e) {
+                //The key was removed so we should delete it too.
+                Map<String, ? extends LocallyCachedBlob> set = rsrc.getValue();
+                if (removeBlob(resource, set)) {
+                    bytesOver -= resource.getSizeOnDisk();
+                    LOG.info("Deleted blob: {} (KEY NOT FOUND).", 
resource.getKey());
                     i.remove();
-                } else {
-                    // since it failed to delete add it back so it gets retried
-                    set.add(resource.getKey(), resource);
                 }
             }
         }
-    }
-
-    @VisibleForTesting
-    public boolean deleteResource(CleanableResourceSet set, ComparableResource 
resource) {
-        return set.deleteUnderlyingResource(resource);
-    }
-
-    public long getCurrentSize() {
-        return currentSize;
-    }
-
-    public int getResourceCount() {
-        return resourceCount;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Cache: ").append(currentSize).append(", ");
-        sb.append("Deleted: ").append(delSize);
-        return sb.toString();
-    }
-
-    interface ComparableResource {
-        long getLastAccessTime();
-
-        long getSize();
-
-        String getNameForDebug();
-
-        String getKey();
-    }
-
-    interface CleanableResourceSet {
-        boolean remove(ComparableResource resource);
-
-        void add(String key, ComparableResource resource);
-
-        boolean deleteUnderlyingResource(ComparableResource resource);
-    }
-
-    public static class LocallyCachedBlobComparableResource implements 
ComparableResource {
-        private final LocallyCachedBlob blob;
-
-        public LocallyCachedBlobComparableResource(LocallyCachedBlob blob) {
-            this.blob = blob;
-        }
-
-        @Override
-        public long getLastAccessTime() {
-            return blob.getLastUsed();
-        }
-
-        @Override
-        public long getSize() {
-            return blob.getSizeOnDisk();
-        }
-
-        @Override
-        public String getNameForDebug() {
-            return blob.getKey();
-        }
-
-        @Override
-        public String getKey() {
-            return blob.getKey();
-        }
-
-        @Override
-        public String toString() {
-            return blob.toString();
-        }
 
-        @Override
-        public boolean equals(Object other) {
-            if (other instanceof LocallyCachedBlobComparableResource) {
-                return blob.equals(((LocallyCachedBlobComparableResource) 
other).blob);
+        for (Iterator<Map.Entry<LocallyCachedBlob, Map<String, ? extends 
LocallyCachedBlob>>> i = noReferences.entrySet().iterator();
+             bytesOver > 0 && i.hasNext();) {
+            Map.Entry<LocallyCachedBlob, Map<String, ? extends 
LocallyCachedBlob>> rsrc = i.next();
+            LocallyCachedBlob resource = rsrc.getKey();
+            Map<String, ? extends LocallyCachedBlob> set = rsrc.getValue();
+            if (removeBlob(resource, set)) {
+                bytesOver -= resource.getSizeOnDisk();
+                LOG.info("Deleted blob: {} (OVER SIZE LIMIT).", 
resource.getKey());
+                i.remove();
             }
-            return false;
-        }
-
-        @Override
-        public int hashCode() {
-            return blob.hashCode();
         }
     }
 
-    private static class CleanableLocalizedLocallyCachedBlob implements 
CleanableResourceSet {
-        private final ConcurrentHashMap<String, LocallyCachedBlob> blobs;
-
-        public CleanableLocalizedLocallyCachedBlob(ConcurrentHashMap<String, 
LocallyCachedBlob> blobs) {
-            this.blobs = blobs;
-        }
-
-        @Override
-        public boolean remove(ComparableResource resource) {
-            if (!(resource instanceof LocallyCachedBlobComparableResource)) {
-                throw new IllegalStateException(resource + " must be a 
LocallyCachedBlobComparableResource");
-            }
-            LocallyCachedBlob blob = 
((LocallyCachedBlobComparableResource)resource).blob;
-            synchronized (blob) {
-                if (!blob.isUsed()) {
-                    try {
-                        blob.completelyRemove();
-                    } catch (Exception e) {
-                        LOG.warn("Tried to remove {} but failed with", blob, 
e);
-                    }
-                    blobs.remove(blob.getKey());
-                    return true;
+    private boolean removeBlob(LocallyCachedBlob blob, Map<String, ? extends 
LocallyCachedBlob> blobs) {
+        synchronized (blob) {
+            if (!blob.isUsed()) {
+                try {
+                    blob.completelyRemove();
+                } catch (Exception e) {
+                    LOG.warn("Tried to remove {} but failed with", blob, e);
                 }
-                return false;
-            }
-        }
-
-        @Override
-        public void add(String key, ComparableResource resource) {
-            ///NOOP not used
-        }
-
-        @Override
-        public boolean deleteUnderlyingResource(ComparableResource resource) {
-            //NOOP not used
-            return true;
-        }
-    }
-
-    private static class LocalizedBlobComparableResource implements 
ComparableResource {
-        private final LocalizedResource resource;
-
-        private LocalizedBlobComparableResource(LocalizedResource resource) {
-            this.resource = resource;
-        }
-
-        @Override
-        public long getLastAccessTime() {
-            return resource.getLastAccessTime();
-        }
-
-        @Override
-        public long getSize() {
-            return resource.getSize();
-        }
-
-        @Override
-        public String getNameForDebug() {
-            return resource.getFilePath();
-        }
-
-        @Override
-        public String getKey() {
-            return resource.getKey();
-        }
-
-        @Override
-        public String toString() {
-            return resource.getKey() + " at " + 
resource.getFilePathWithVersion();
-        }
-
-        @Override
-        public boolean equals(Object other) {
-            if (other instanceof LocalizedBlobComparableResource) {
-                return resource.equals(((LocalizedBlobComparableResource) 
other).resource);
+                blobs.remove(blob.getKey());
+                return true;
             }
             return false;
         }
-
-        @Override
-        public int hashCode() {
-            return resource.hashCode();
-        }
     }
 
-    private static class CleanableLocalizedResourceSet implements 
CleanableResourceSet {
-        private final LocalizedResourceSet set;
-
-        public CleanableLocalizedResourceSet(LocalizedResourceSet set) {
-            this.set = set;
-        }
-
-        @Override
-        public boolean remove(ComparableResource resource) {
-            if (!(resource instanceof LocalizedBlobComparableResource)) {
-                throw new IllegalStateException(resource + " must be a 
LocalizedBlobComparableResource");
-            }
-            return 
set.remove(((LocalizedBlobComparableResource)resource).resource);
-        }
-
-        @Override
-        public void add(String key, ComparableResource resource) {
-            if (!(resource instanceof LocalizedBlobComparableResource)) {
-                throw new IllegalStateException(resource + " must be a 
LocalizedBlobComparableResource");
-            }
-            LocalizedResource r = 
((LocalizedBlobComparableResource)resource).resource;
-            set.add(key, r, r.isUncompressed());
-        }
-
-        @Override
-        public boolean deleteUnderlyingResource(ComparableResource resource) {
-            if (resource instanceof LocalizedBlobComparableResource) {
-                LocalizedResource lr = ((LocalizedBlobComparableResource) 
resource).resource;
-                try {
-                    Path fileWithVersion = new 
File(lr.getFilePathWithVersion()).toPath();
-                    Path currentSymLink = new 
File(lr.getCurrentSymlinkPath()).toPath();
-                    Path versionFile = new 
File(lr.getVersionFilePath()).toPath();
-
-                    if (lr.isUncompressed()) {
-                        if (Files.exists(fileWithVersion)) {
-                            // this doesn't follow symlinks, which is what we 
want
-                            
FileUtils.deleteDirectory(fileWithVersion.toFile());
-                        }
-                    } else {
-                        Files.deleteIfExists(fileWithVersion);
-                    }
-                    Files.deleteIfExists(currentSymLink);
-                    Files.deleteIfExists(versionFile);
-                    return true;
-                } catch (IOException e) {
-                    LOG.warn("Could not delete: {}", 
resource.getNameForDebug(), e);
-                }
-                return false;
-            }  else {
-                throw new IllegalArgumentException("Don't know how to handle a 
" + resource.getClass());
-            }
-        }
+    @Override
+    public String toString() {
+        return "Cache: " + currentSize;
     }
 
-    static class LRUComparator implements Comparator<ComparableResource> {
-        public int compare(ComparableResource r1, ComparableResource r2) {
-            long ret = r1.getLastAccessTime() - r2.getLastAccessTime();
+    static class LRUComparator implements Comparator<LocallyCachedBlob> {
+        public int compare(LocallyCachedBlob r1, LocallyCachedBlob r2) {
+            long ret = r1.getLastUsed() - r2.getLastUsed();
             if (0 == ret) {
                 return System.identityHashCode(r1) - 
System.identityHashCode(r2);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceSet.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceSet.java
 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceSet.java
deleted file mode 100644
index 62d5b2f..0000000
--- 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceSet.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.localizer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Set of localized resources for a specific user.
- */
-public class LocalizedResourceSet {
-
-  public static final Logger LOG = 
LoggerFactory.getLogger(LocalizedResourceSet.class);
-  // Key to LocalizedResource mapping for files
-  private final ConcurrentMap<String, LocalizedResource> _localrsrcFiles;
-  // Key to LocalizedResource mapping for files to be uncompressed
-  private final ConcurrentMap<String, LocalizedResource> _localrsrcArchives;
-  private String _user;
-
-  LocalizedResourceSet(String user) {
-    this._localrsrcFiles = new ConcurrentHashMap<String, LocalizedResource>();
-    this._localrsrcArchives = new ConcurrentHashMap<String, 
LocalizedResource>();
-    _user = user;
-  }
-
-  public String getUser() {
-    return _user;
-  }
-
-  public int getSize() {
-    return _localrsrcFiles.size() + _localrsrcArchives.size();
-  }
-
-  public LocalizedResource get(String name, boolean uncompress) {
-    if (uncompress) {
-      return _localrsrcArchives.get(name);
-    }
-    return _localrsrcFiles.get(name);
-  }
-
-  public void putIfAbsent(String resourceName, LocalizedResource 
updatedResource,
-                            boolean uncompress) {
-    if (uncompress) {
-      _localrsrcArchives.putIfAbsent(resourceName, updatedResource);
-    } else {
-      _localrsrcFiles.putIfAbsent(resourceName, updatedResource);
-    }
-  }
-
-  public void add(String resourceName, LocalizedResource newResource, boolean 
uncompress) {
-    if (uncompress) {
-      _localrsrcArchives.put(resourceName, newResource);
-    } else {
-      _localrsrcFiles.put(resourceName, newResource);
-    }
-  }
-
-  public boolean exists(String resourceName, boolean uncompress) {
-    if (uncompress) {
-      return _localrsrcArchives.containsKey(resourceName);
-    }
-    return _localrsrcFiles.containsKey(resourceName);
-  }
-
-  public boolean remove(LocalizedResource resource) {
-    LocalizedResource lrsrc = null;
-    if (resource.isUncompressed()) {
-      lrsrc = _localrsrcArchives.remove(resource.getKey());
-    } else {
-      lrsrc = _localrsrcFiles.remove(resource.getKey());
-    }
-    return (lrsrc != null);
-  }
-
-  public Iterator<LocalizedResource> getLocalFilesIterator() {
-    return _localrsrcFiles.values().iterator();
-  }
-
-  public Iterator<LocalizedResource> getLocalArchivesIterator() {
-    return _localrsrcArchives.values().iterator();
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java 
b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
index a287e95..1f7ee00 100644
--- 
a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
+++ 
b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
@@ -19,18 +19,24 @@
 package org.apache.storm.localizer;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.LinkOption;
 import java.nio.file.Path;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,9 +47,10 @@ public abstract class LocallyCachedBlob {
     private static final Logger LOG = 
LoggerFactory.getLogger(LocallyCachedBlob.class);
     public static final long NOT_DOWNLOADED_VERSION = -1;
     // A callback that does nothing.
-    private static final BlobChangingCallback NOOP_CB = (assignment, port, 
resource, go) -> {};
+    private static final BlobChangingCallback NOOP_CB = (assignment, port, 
resource, go) -> {
+    };
 
-    private long lastUsed = System.currentTimeMillis();
+    private long lastUsed = Time.currentTimeMillis();
     private final Map<PortAndAssignment, BlobChangingCallback> references = 
new HashMap<>();
     private final String blobDescription;
     private final String blobKey;
@@ -51,8 +58,9 @@ public abstract class LocallyCachedBlob {
 
     /**
      * Create a new LocallyCachedBlob.
+     *
      * @param blobDescription a description of the blob this represents.  
Typically it should at least be the blob key, but ideally also
-     * include if it is an archive or not, what user or topology it is for, or 
if it is a storm.jar etc.
+     *     include if it is an archive or not, what user or topology it is 
for, or if it is a storm.jar etc.
      */
     protected LocallyCachedBlob(String blobDescription, String blobKey) {
         this.blobDescription = blobDescription;
@@ -60,26 +68,54 @@ public abstract class LocallyCachedBlob {
     }
 
     /**
-     * Get the version of the blob cached locally.  If the version is unknown 
or it has not been downloaded NOT_DOWNLOADED_VERSION
-     * should be returned.
-     * PRECONDITION: this can only be called with a lock on this instance held.
+     * Get the version of the blob cached locally.  If the version is unknown 
or it has not been downloaded NOT_DOWNLOADED_VERSION should be
+     * returned. PRECONDITION: this can only be called with a lock on this 
instance held.
      */
     public abstract long getLocalVersion();
 
     /**
-     * Get the version of the blob in the blob store.
-     * PRECONDITION: this can only be called with a lock on this instance held.
+     * Get the version of the blob in the blob store. PRECONDITION: this can 
only be called with a lock on this instance held.
      */
     public abstract long getRemoteVersion(ClientBlobStore store) throws 
KeyNotFoundException, AuthorizationException;
 
     /**
      * Download the latest version to a temp location. This may also include 
unzipping some or all of the data to a temp location.
      * PRECONDITION: this can only be called with a lock on this instance held.
+     *
      * @param store the store to us to download the data.
      * @return the version that was downloaded.
      */
     public abstract long downloadToTempLocation(ClientBlobStore store) throws 
IOException, KeyNotFoundException, AuthorizationException;
 
+    protected static long downloadToTempLocation(ClientBlobStore store, String 
key, long currentVersion, IAdvancedFSOps fsOps,
+                                          Function<Long, Path> getTempPath)
+        throws KeyNotFoundException, AuthorizationException, IOException {
+        try (InputStreamWithMeta in = store.getBlob(key)) {
+            long newVersion = in.getVersion();
+            if (newVersion == currentVersion) {
+                LOG.warn("The version did not change, but going to download 
again {} {}", currentVersion, key);
+            }
+            Path tmpLocation = getTempPath.apply(newVersion);
+            long totalRead = 0;
+            //Make sure the parent directory is there and ready to go
+            fsOps.forceMkdir(tmpLocation.getParent());
+            try (OutputStream outStream = 
fsOps.getOutputStream(tmpLocation.toFile())) {
+                byte[] buffer = new byte[4096];
+                int read = 0;
+                while ((read = in.read(buffer)) > 0) {
+                    outStream.write(buffer, 0, read);
+                    totalRead += read;
+                }
+            }
+            long expectedSize = in.getFileLength();
+            if (totalRead != expectedSize) {
+                throw new IOException("We expected to download " + 
expectedSize + " bytes but found we got " + totalRead);
+            }
+
+            return newVersion;
+        }
+    }
+
     /**
      * Commit the new version and make it available for the end user.
      * PRECONDITION: uncompressToTempLocationIfNeeded will have been called.
@@ -110,32 +146,11 @@ public abstract class LocallyCachedBlob {
     public abstract long getSizeOnDisk();
 
     /**
-     * Updates the last updated time.  This should be called when references 
are added or removed.
-     */
-    private synchronized void touch() {
-        lastUsed = System.currentTimeMillis();
-    }
-
-    /**
-     * Get the last time that this used for LRU calculations.
-     */
-    public synchronized long getLastUsed() {
-        return lastUsed;
-    }
-
-    /**
-     * Return true if this blob is actively being used, else false (meaning it 
can be deleted, but might not be).
-     */
-    public synchronized boolean isUsed() {
-        return !references.isEmpty();
-    }
-
-    /**
      * Get the size of p in bytes.
      * @param p the path to read.
      * @return the size of p in bytes.
      */
-    protected long getSizeOnDisk(Path p) throws IOException {
+    protected static long getSizeOnDisk(Path p) throws IOException {
         if (!Files.exists(p)) {
             return 0;
         } else if (Files.isRegularFile(p)) {
@@ -156,6 +171,28 @@ public abstract class LocallyCachedBlob {
     }
 
     /**
+     * Updates the last updated time.  This should be called when references 
are added or removed.
+     */
+    protected synchronized void touch() {
+        lastUsed = Time.currentTimeMillis();
+        LOG.debug("Setting {} ts to {}", blobKey, lastUsed);
+    }
+
+    /**
+     * Get the last time that this used for LRU calculations.
+     */
+    public synchronized long getLastUsed() {
+        return lastUsed;
+    }
+
+    /**
+     * Return true if this blob is actively being used, else false (meaning it 
can be deleted, but might not be).
+     */
+    public synchronized boolean isUsed() {
+        return !references.isEmpty();
+    }
+
+    /**
      * Mark that a given port and assignment are using this.
      * @param pna the slot and assignment that are using this blob.
      * @param cb an optional callback indicating that they want to 
know/synchronize when a blob is updated.
@@ -177,6 +214,7 @@ public abstract class LocallyCachedBlob {
         if (references.remove(pna) == null) {
             LOG.warn("{} had no reservation for {}", pna, blobDescription);
         }
+        touch();
     }
 
     /**
@@ -217,4 +255,11 @@ public abstract class LocallyCachedBlob {
     public String getKey() {
         return blobKey;
     }
+
+
+    public Collection<PortAndAssignment> getDependencies() {
+        return references.keySet();
+    }
+
+    public abstract boolean isFullyDownloaded();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
 
b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
index 35371b5..68415e1 100644
--- 
a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
+++ 
b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
@@ -115,14 +115,13 @@ public class LocallyCachedTopologyBlob extends 
LocallyCachedBlob {
         public String getTempExtractionDir(long version) {
             return extractionDir + "." + version;
         }
-    };
+    }
 
     private final TopologyBlobType type;
     private final String topologyId;
     private final boolean isLocalMode;
     private final Path topologyBasicBlobsRootDir;
     private final AdvancedFSOps fsOps;
-    private final Map<String, Object> conf;
     private volatile long version = NOT_DOWNLOADED_VERSION;
     private volatile long size = 0;
 
@@ -139,7 +138,6 @@ public class LocallyCachedTopologyBlob extends 
LocallyCachedBlob {
         this.type = type;
         this.isLocalMode = isLocalMode;
         this.fsOps = fsOps;
-        this.conf = conf;
         topologyBasicBlobsRootDir = 
Paths.get(ConfigUtils.supervisorStormDistRoot(conf, topologyId));
         readVersion();
         updateSizeOnDisk();
@@ -203,31 +201,11 @@ public class LocallyCachedTopologyBlob extends 
LocallyCachedBlob {
             return LOCAL_MODE_JAR_VERSION;
         }
 
-        long newVersion;
-        Path tmpLocation;
-        String key = type.getKey(topologyId);
-        try (InputStreamWithMeta in = store.getBlob(key)) {
-            newVersion = in.getVersion();
-            long expectedSize = in.getFileLength();
-            if (newVersion == version) {
-                throw new RuntimeException("The version did not change, but we 
tried to download it. " + version + " " + key);
-            }
-            tmpLocation = 
topologyBasicBlobsRootDir.resolve(type.getTempFileName(newVersion));
-            long totalRead = 0;
-            //Make sure the parent directory is there and ready to go
-            fsOps.forceMkdir(tmpLocation.getParent());
-            try (OutputStream outStream = 
fsOps.getOutputStream(tmpLocation.toFile())) {
-                byte [] buffer = new byte[4096];
-                int read = 0;
-                while ((read = in.read(buffer)) > 0) {
-                    outStream.write(buffer, 0, read);
-                    totalRead += read;
-                }
-            }
-            if (totalRead != expectedSize) {
-                throw new IOException("We expected to download " + 
expectedSize + " bytes but found we got " + totalRead);
-            }
-        }
+
+        long newVersion = downloadToTempLocation(store, 
type.getKey(topologyId), version, fsOps,
+            (version) -> 
topologyBasicBlobsRootDir.resolve(type.getTempFileName(version)));
+
+        Path tmpLocation = 
topologyBasicBlobsRootDir.resolve(type.getTempFileName(newVersion));
 
         if (type.needsExtraction()) {
             Path extractionDest = 
topologyBasicBlobsRootDir.resolve(type.getTempExtractionDir(newVersion));
@@ -247,10 +225,10 @@ public class LocallyCachedTopologyBlob extends 
LocallyCachedBlob {
                 String name = entry.getName();
                 if (!entry.isDirectory() && name.startsWith(toRemove)) {
                     String shortenedName = name.replace(toRemove, "");
-                    Path aFile = dest.resolve(shortenedName);
-                    LOG.debug("EXTRACTING {} SHORTENED to {} into {}", name, 
shortenedName, aFile);
-                    fsOps.forceMkdir(aFile.getParent());
-                    try (FileOutputStream out = new 
FileOutputStream(aFile.toFile());
+                    Path targetFile = dest.resolve(shortenedName);
+                    LOG.debug("EXTRACTING {} SHORTENED to {} into {}", name, 
shortenedName, targetFile);
+                    fsOps.forceMkdir(targetFile.getParent());
+                    try (FileOutputStream out = new 
FileOutputStream(targetFile.toFile());
                          InputStream in = jarFile.getInputStream(entry)) {
                         IOUtils.copy(in, out);
                     }
@@ -260,6 +238,21 @@ public class LocallyCachedTopologyBlob extends 
LocallyCachedBlob {
     }
 
     @Override
+    public boolean isFullyDownloaded() {
+        Path versionFile = 
topologyBasicBlobsRootDir.resolve(type.getVersionFileName());
+        boolean ret = Files.exists(versionFile);
+        Path dest = topologyBasicBlobsRootDir.resolve(type.getFileName());
+        if (!(isLocalMode && type == TopologyBlobType.TOPO_JAR)) {
+            ret = ret && Files.exists(dest);
+        }
+        if (type.needsExtraction()) {
+            Path extractionDest = 
topologyBasicBlobsRootDir.resolve(type.getExtractionDir());
+            ret = ret && Files.exists(extractionDest);
+        }
+        return ret;
+    }
+
+    @Override
     public void commitNewVersion(long newVersion) throws IOException {
         //This is not atomic (so if something bad happens in the middle we 
need to be able to recover
         Path tempLoc = 
topologyBasicBlobsRootDir.resolve(type.getTempFileName(newVersion));
@@ -325,6 +318,7 @@ public class LocallyCachedTopologyBlob extends 
LocallyCachedBlob {
         if (type.needsExtraction()) {
             removeAll(type.getExtractionDir());
         }
+        touch();
     }
 
     private void removeAll(String baseName) throws IOException {

http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java 
b/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java
index 081c811..bd92173 100644
--- 
a/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java
+++ 
b/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java
@@ -21,7 +21,7 @@ package org.apache.storm.localizer;
 import org.apache.storm.generated.LocalAssignment;
 
 /**
- * A Port and a LocalAssignment used to reference count Resources
+ * A Port and a LocalAssignment used to reference count resources.
  */
 class PortAndAssignment {
     private final int port;
@@ -45,6 +45,10 @@ class PortAndAssignment {
         return assignment.get_topology_id();
     }
 
+    public String getOwner() {
+        return assignment.get_owner();
+    }
+
     @Override
     public int hashCode() {
         return (17 * port) + assignment.hashCode();
@@ -52,7 +56,7 @@ class PortAndAssignment {
 
     @Override
     public String toString() {
-        return "{" + port + " " + assignment + "}";
+        return "{" + assignment.get_topology_id() + " on " + port + "}";
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java 
b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index 6a4454a..340db1c 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -82,10 +82,7 @@ public class ServerUtils {
     public static final Logger LOG = 
LoggerFactory.getLogger(ServerUtils.class);
 
     public static final boolean IS_ON_WINDOWS = 
"Windows_NT".equals(System.getenv("OS"));
-    public static final String CURRENT_BLOB_SUFFIX_ID = "current";
 
-    public static final String DEFAULT_CURRENT_BLOB_SUFFIX = "." + 
CURRENT_BLOB_SUFFIX_ID;
-    public static final String DEFAULT_BLOB_VERSION_SUFFIX = ".version";
     public static final int SIGKILL = 9;
     public static final int SIGTERM = 15;
 
@@ -166,14 +163,6 @@ public class ServerUtils {
         return StringUtils.join(changedCommands, " ");
     }
 
-    public static String constructVersionFileName(String fileName) {
-        return fileName + DEFAULT_BLOB_VERSION_SUFFIX;
-    }
-
-    public static String constructBlobCurrentSymlinkName(String fileName) {
-        return fileName + DEFAULT_CURRENT_BLOB_SUFFIX;
-    }
-
     /**
      * Takes an input dir or file and returns the disk usage on that local 
directory.
      * Very basic implementation.
@@ -206,14 +195,6 @@ public class ServerUtils {
         }
     }
 
-    public static long localVersionOfBlob(String localFile) {
-        return Utils.getVersionFromBlobVersionFile(new File(localFile + 
DEFAULT_BLOB_VERSION_SUFFIX));
-    }
-
-    public static String constructBlobWithVersionFileName(String fileName, 
long version) {
-        return fileName + "." + version;
-    }
-
     public static ClientBlobStore getClientBlobStoreForSupervisor(Map<String, 
Object> conf) {
         ClientBlobStore store;
         if (ConfigUtils.isLocalMode(conf)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/test/java/org/apache/storm/TestingTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/TestingTest.java 
b/storm-server/src/test/java/org/apache/storm/TestingTest.java
new file mode 100644
index 0000000..201b470
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/TestingTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.testing.CompleteTopologyParam;
+import org.apache.storm.testing.FixedTuple;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MkClusterParam;
+import org.apache.storm.testing.MockedSources;
+import org.apache.storm.testing.TestAggregatesCounter;
+import org.apache.storm.testing.TestGlobalCount;
+import org.apache.storm.testing.TestJob;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test that the testing class does what it should do.
+ */
+public class TestingTest {
+
+    private static final TestJob COMPLETE_TOPOLOGY_TESTJOB = (cluster) -> {
+        TopologyBuilder tb = new TopologyBuilder();
+        tb.setSpout("spout", new TestWordSpout(true), 3);
+        tb.setBolt("2", new TestWordCounter(), 4)
+            .fieldsGrouping("spout", new Fields("word"));
+        tb.setBolt("3", new TestGlobalCount())
+            .globalGrouping("spout");
+        tb.setBolt("4", new TestAggregatesCounter())
+            .globalGrouping("2");
+
+        MockedSources mocked = new MockedSources();
+        mocked.addMockData("spout",
+            new Values("nathan"),
+            new Values("bob"),
+            new Values("joey"),
+            new Values("nathan"));
+
+        Config topoConf = new Config();
+        topoConf.setNumWorkers(2);
+
+        CompleteTopologyParam ctp = new CompleteTopologyParam();
+        ctp.setMockedSources(mocked);
+        ctp.setStormConf(topoConf);
+
+        Map<String, List<FixedTuple>> results = 
Testing.completeTopology(cluster, tb.createTopology(), ctp);
+        List<List<Object>> spoutTuples = Testing.readTuples(results, "spout");
+        List<List<Object>> expectedSpoutTuples = 
Arrays.asList(Arrays.asList("nathan"), Arrays.asList("bob"), 
Arrays.asList("joey"),
+            Arrays.asList("nathan"));
+        assertTrue(expectedSpoutTuples + " expected, but found " + spoutTuples,
+            Testing.multiseteq(expectedSpoutTuples, spoutTuples));
+
+        List<List<Object>> twoTuples = Testing.readTuples(results, "2");
+        List<List<Object>> expectedTwoTuples = 
Arrays.asList(Arrays.asList("nathan", 1), Arrays.asList("nathan", 2),
+            Arrays.asList("bob", 1), Arrays.asList("joey", 1));
+        assertTrue(expectedTwoTuples + " expected, but found " + twoTuples,
+            Testing.multiseteq(expectedTwoTuples, twoTuples));
+
+        List<List<Object>> threeTuples = Testing.readTuples(results, "3");
+        List<List<Object>> expectedThreeTuples = 
Arrays.asList(Arrays.asList(1), Arrays.asList(2),
+            Arrays.asList(3), Arrays.asList(4));
+        assertTrue(expectedThreeTuples + " expected, but found " + threeTuples,
+            Testing.multiseteq(expectedThreeTuples, threeTuples));
+
+        List<List<Object>> fourTuples = Testing.readTuples(results, "4");
+        List<List<Object>> expectedFourTuples = 
Arrays.asList(Arrays.asList(1), Arrays.asList(2),
+            Arrays.asList(3), Arrays.asList(4));
+        assertTrue(expectedFourTuples + " expected, but found " + fourTuples,
+            Testing.multiseteq(expectedFourTuples, fourTuples));
+    };
+
+    @Test
+    @Category(IntegrationTest.class)
+    public void testCompleteTopologyNettySimulated() throws Exception {
+        Config daemonConf = new Config();
+        daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true);
+        MkClusterParam param = new MkClusterParam();
+        param.setSupervisors(4);
+        param.setDaemonConf(daemonConf);
+
+        Testing.withSimulatedTimeLocalCluster(param, 
COMPLETE_TOPOLOGY_TESTJOB);
+    }
+
+    @Test
+    @Category(IntegrationTest.class)
+    public void testCompleteTopologyNetty() throws Exception {
+        Config daemonConf = new Config();
+        daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true);
+        MkClusterParam param = new MkClusterParam();
+        param.setSupervisors(4);
+        param.setDaemonConf(daemonConf);
+
+        Testing.withLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
+    }
+
+    @Test
+    @Category(IntegrationTest.class)
+    public void testCompleteTopologyLocalSimulated() throws Exception {
+        MkClusterParam param = new MkClusterParam();
+        param.setSupervisors(4);
+
+        Testing.withSimulatedTimeLocalCluster(param, 
COMPLETE_TOPOLOGY_TESTJOB);
+    }
+
+    @Test
+    @Category(IntegrationTest.class)
+    public void testCompleteTopologyLocal() throws Exception {
+        MkClusterParam param = new MkClusterParam();
+        param.setSupervisors(4);
+
+        Testing.withLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
+    }
+
+}
\ No newline at end of file

Reply via email to