http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java index f38f6f6..862349b 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java @@ -15,30 +15,43 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.localizer; /** - * Local Resource requested by the topology + * Local Resource requested by the topology. */ public class LocalResource { - private String _blobKey; - private boolean _uncompress; + private final boolean needsCallback; + private final String blobKey; + private final boolean uncompress; + + /** + * Constructor. + * @param keyname the key of the blob to download. + * @param uncompress should the blob be uncompressed or not. + * @param needsCallback if the blobs changes should a callback happen so the worker is restarted. + */ + public LocalResource(String keyname, boolean uncompress, boolean needsCallback) { + blobKey = keyname; + this.uncompress = uncompress; + this.needsCallback = needsCallback; + } - public LocalResource(String keyname, boolean uncompress) { - _blobKey = keyname; - _uncompress = uncompress; - } + public String getBlobName() { + return blobKey; + } - public String getBlobName() { - return _blobKey; - } + public boolean shouldUncompress() { + return uncompress; + } - public boolean shouldUncompress() { - return _uncompress; - } + public boolean needsCallback() { + return needsCallback; + } - @Override - public String toString() { - return "Key: " + _blobKey + " uncompress: " + _uncompress; - } + @Override + public String toString() { + return "Key: " + blobKey + " uncompress: " + uncompress; + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java index 7241976..4f01c30 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java @@ -15,129 +15,434 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.localizer; +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; + +import com.google.common.annotations.VisibleForTesting; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.blobstore.InputStreamWithMeta; +import org.apache.storm.daemon.supervisor.IAdvancedFSOps; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.KeyNotFoundException; +import org.apache.storm.generated.ReadableBlobMeta; +import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.ServerUtils; +import org.apache.storm.utils.ShellUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - /** * Represents a resource that is localized on the supervisor. * A localized resource has a .current symlink to the current version file which is named * filename.{current version}. There is also a filename.version which contains the latest version. */ -public class LocalizedResource { - public static final Logger LOG = LoggerFactory.getLogger(LocalizedResource.class); - - // filesystem path to the resource - private final String _localPath; - private final String _versionFilePath; - private final String _symlinkPath; - private final String _key; - private final boolean _uncompressed; - // _size of the resource - private long _size = -1; - // queue of topologies referencing resource - private final Set<String> _ref; - // last access time of the resource -> increment when topology finishes using it - private final AtomicLong _lastAccessTime = new AtomicLong(currentTime()); - - public LocalizedResource(String key, String fileLoc, boolean uncompressed) { - _ref = new HashSet<String>(); - _localPath = fileLoc; - _versionFilePath = ServerUtils.constructVersionFileName(fileLoc); - _symlinkPath = ServerUtils.constructBlobCurrentSymlinkName(fileLoc); - _uncompressed = uncompressed; - _key = key; - // we trust that the file exists - _size = ServerUtils.getDU(new File(getFilePathWithVersion())); - LOG.debug("size of {} is: {}", fileLoc, _size); - } - - // create local resource and add reference - public LocalizedResource(String key, String fileLoc, boolean uncompressed, String topo) { - this(key, fileLoc, uncompressed); - _ref.add(topo); - } - - public boolean isUncompressed() { - return _uncompressed; - } - - public String getKey() { - return _key; - } - - public String getCurrentSymlinkPath() { - return _symlinkPath; - } - - public String getVersionFilePath() { - return _versionFilePath; - } - - public String getFilePathWithVersion() { - long version = ServerUtils.localVersionOfBlob(_localPath); - return ServerUtils.constructBlobWithVersionFileName(_localPath, version); - } - - public String getFilePath() { - return _localPath; - } - - public void addReference(String topo) { - _ref.add(topo); - } - - public void removeReference(String topo) { - if (!_ref.remove(topo)) { - LOG.warn("Tried to remove a reference to a topology that doesn't use this resource"); - } - setTimestamp(); - } - - // The last access time is only valid if the resource doesn't have any references. - public long getLastAccessTime() { - return _lastAccessTime.get(); - } - - // for testing - protected void setSize(long size) { - _size = size; - } - - public long getSize() { - return _size; - } - - private void setTimestamp() { - _lastAccessTime.set(currentTime()); - } - - public int getRefCount() { - return _ref.size(); - } - - private long currentTime() { - return System.nanoTime(); - } - - @Override - public boolean equals(Object other) { - if (other instanceof LocalizedResource) { - LocalizedResource l = (LocalizedResource)other; - return _key.equals(l._key) && _uncompressed == l._uncompressed && _localPath.equals(l._localPath); - } - return false; - } - - @Override - public int hashCode() { - return _key.hashCode() + Boolean.hashCode(_uncompressed) + _localPath.hashCode(); - } +public class LocalizedResource extends LocallyCachedBlob { + private static final Logger LOG = LoggerFactory.getLogger(LocalizedResource.class); + @VisibleForTesting + static final String CURRENT_BLOB_SUFFIX = ".current"; + @VisibleForTesting + static final String BLOB_VERSION_SUFFIX = ".version"; + @VisibleForTesting + static final String FILECACHE = "filecache"; + @VisibleForTesting + static final String USERCACHE = "usercache"; + // sub directories to store either files or uncompressed archives respectively + @VisibleForTesting + static final String FILESDIR = "files"; + @VisibleForTesting + static final String ARCHIVESDIR = "archives"; + private static final String TO_UNCOMPRESS = "_tmp_"; + + private static Path constructVersionFileName(Path baseDir, String key) { + return baseDir.resolve(key + BLOB_VERSION_SUFFIX); + } + + @VisibleForTesting + static long localVersionOfBlob(Path versionFile) { + long currentVersion = -1; + if (Files.exists(versionFile) && !(Files.isDirectory(versionFile))) { + try (BufferedReader br = new BufferedReader(new FileReader(versionFile.toFile()))) { + String line = br.readLine(); + currentVersion = Long.parseLong(line); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return currentVersion; + } + + private static Path constructBlobCurrentSymlinkName(Path baseDir, String key) { + return baseDir.resolve(key + CURRENT_BLOB_SUFFIX); + } + + private static Path constructBlobWithVersionFileName(Path baseDir, String key, long version) { + return baseDir.resolve(key + "." + version); + } + + static Collection<String> getLocalizedUsers(Path localBaseDir) throws IOException { + Path userCacheDir = getUserCacheDir(localBaseDir); + if (!Files.exists(userCacheDir)) { + return Collections.emptyList(); + } + return Files.list(userCacheDir).map((p) -> p.getFileName().toString()).collect(Collectors.toList()); + } + + static void completelyRemoveUnusedUser(Path localBaseDir, String user) throws IOException { + Path userFileCacheDir = getLocalUserFileCacheDir(localBaseDir, user); + // baseDir/supervisor/usercache/user1/filecache/files + Files.deleteIfExists(getCacheDirForFiles(userFileCacheDir)); + // baseDir/supervisor/usercache/user1/filecache/archives + Files.deleteIfExists(getCacheDirForArchives(userFileCacheDir)); + // baseDir/supervisor/usercache/user1/filecache + Files.deleteIfExists(userFileCacheDir); + // baseDir/supervisor/usercache/user1 + Files.deleteIfExists(getLocalUserDir(localBaseDir, user)); + } + + static List<String> getLocalizedArchiveKeys(Path localBaseDir, String user) throws IOException { + Path dir = getCacheDirForArchives(getLocalUserFileCacheDir(localBaseDir, user)); + return readKeysFromDir(dir); + } + + static List<String> getLocalizedFileKeys(Path localBaseDir, String user) throws IOException { + Path dir = getCacheDirForFiles(getLocalUserFileCacheDir(localBaseDir, user)); + return readKeysFromDir(dir); + } + + // Looks for files in the directory with .current suffix + private static List<String> readKeysFromDir(Path dir) throws IOException { + if (!Files.exists(dir)) { + return Collections.emptyList(); + } + return Files.list(dir) + .map((p) -> p.getFileName().toString()) + .filter((name) -> name.toLowerCase().endsWith(CURRENT_BLOB_SUFFIX)) + .map((key) -> { + int p = key.lastIndexOf('.'); + if (p > 0) { + key = key.substring(0, p); + } + return key; + }) + .collect(Collectors.toList()); + } + + // baseDir/supervisor/usercache/ + private static Path getUserCacheDir(Path localBaseDir) { + return localBaseDir.resolve(USERCACHE); + } + + // baseDir/supervisor/usercache/user1/ + static Path getLocalUserDir(Path localBaseDir, String userName) { + return getUserCacheDir(localBaseDir).resolve(userName); + } + + // baseDir/supervisor/usercache/user1/filecache + static Path getLocalUserFileCacheDir(Path localBaseDir, String userName) { + return getLocalUserDir(localBaseDir, userName).resolve(FILECACHE); + } + + // baseDir/supervisor/usercache/user1/filecache/files + private static Path getCacheDirForFiles(Path dir) { + return dir.resolve(FILESDIR); + } + + // get the directory to put uncompressed archives in + // baseDir/supervisor/usercache/user1/filecache/archives + private static Path getCacheDirForArchives(Path dir) { + return dir.resolve(ARCHIVESDIR); + } + + // filesystem path to the resource + private final Path baseDir; + private final Path versionFilePath; + private final Path symlinkPath; + private final boolean uncompressed; + private final IAdvancedFSOps fsOps; + private final String user; + // size of the resource + private long size = -1; + private final Map<String, Object> conf; + + LocalizedResource(String key, Path localBaseDir, boolean uncompressed, IAdvancedFSOps fsOps, Map<String, Object> conf, + String user) { + super(key + (uncompressed ? " archive" : " file"), key); + Path base = getLocalUserFileCacheDir(localBaseDir, user); + this.baseDir = uncompressed ? getCacheDirForArchives(base) : getCacheDirForFiles(base); + this.conf = conf; + this.user = user; + this.fsOps = fsOps; + versionFilePath = constructVersionFileName(baseDir, key); + symlinkPath = constructBlobCurrentSymlinkName(baseDir, key); + this.uncompressed = uncompressed; + //Set the size in case we are recovering an already downloaded object + setSize(); + } + + Path getCurrentSymlinkPath() { + return symlinkPath; + } + + @VisibleForTesting + Path getFilePathWithVersion() { + return constructBlobWithVersionFileName(baseDir, getKey(), getLocalVersion()); + } + + private void setSize() { + // we trust that the file exists + Path withVersion = getFilePathWithVersion(); + size = ServerUtils.getDU(withVersion.toFile()); + LOG.debug("size of {} is: {}", withVersion, size); + } + + @VisibleForTesting + protected void setSize(long size) { + this.size = size; + } + + @Override + public long getLocalVersion() { + return localVersionOfBlob(versionFilePath); + } + + @Override + public long getRemoteVersion(ClientBlobStore store) throws KeyNotFoundException, AuthorizationException { + return ServerUtils.nimbusVersionOfBlob(getKey(), store); + } + + @Override + public long downloadToTempLocation(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException { + String key = getKey(); + ReadableBlobMeta meta = store.getBlobMeta(key); + if (!ServerUtils.canUserReadBlob(meta, user, conf)) { + throw new AuthorizationException(user + " does not have READ access to " + key); + } + long version; + Path downloadFile; + Path finalLocation; + try (InputStreamWithMeta in = store.getBlob(key)) { + version = in.getVersion(); + finalLocation = constructBlobWithVersionFileName(baseDir, getKey(), version); + if (uncompressed) { + // we need to download to temp file and then unpack into the one requested + downloadFile = tmpOutputLocation(); + } else { + downloadFile = finalLocation; + } + byte[] buffer = new byte[1024]; + int len; + LOG.debug("Downloading {} to {}", key, downloadFile); + Path parent = downloadFile.getParent(); + if (!Files.exists(parent)) { + Files.createDirectory(parent); + } + try (FileOutputStream out = new FileOutputStream(downloadFile.toFile())) { + while ((len = in.read(buffer)) >= 0) { + out.write(buffer, 0, len); + } + } + } + if (uncompressed) { + ServerUtils.unpack(downloadFile.toFile(), finalLocation.toFile()); + LOG.debug("Uncompressed {} to: {}", downloadFile, finalLocation); + } + setBlobPermissions(conf, user, finalLocation); + return version; + } + + @Override + public void commitNewVersion(long version) throws IOException { + String key = getKey(); + LOG.info("Blob: {} updated to version {} from version {}", key, version, getLocalVersion()); + Path localVersionFile = versionFilePath; + // The false parameter ensures overwriting the version file, not appending + try (PrintWriter writer = new PrintWriter( + new BufferedWriter(new FileWriter(localVersionFile.toFile(), false)))) { + writer.println(version); + } + setBlobPermissions(conf, user, localVersionFile); + + // Update the key.current symlink. First create tmp symlink and do + // move of tmp to current so that the operation is atomic. + Path tmpSymlink = tmpSymlinkLocation(); + Path targetOfSymlink = constructBlobWithVersionFileName(baseDir, getKey(), version); + LOG.debug("Creating a symlink @{} linking to: {}", tmpSymlink, targetOfSymlink); + Files.createSymbolicLink(tmpSymlink, targetOfSymlink); + + Path currentSymLink = getCurrentSymlinkPath(); + Files.move(tmpSymlink, currentSymLink, ATOMIC_MOVE); + //Update the size of the objects + setSize(); + } + + private void setBlobPermissions(Map<String, Object> conf, String user, Path path) + throws IOException { + + if (!ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + return; + } + String wlCommand = ObjectReader.getString(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER), ""); + if (wlCommand.isEmpty()) { + String stormHome = System.getProperty("storm.home"); + wlCommand = stormHome + "/bin/worker-launcher"; + } + List<String> command = new ArrayList<>(Arrays.asList(wlCommand, user, "blob", path.toString())); + + String[] commandArray = command.toArray(new String[command.size()]); + ShellUtils.ShellCommandExecutor shExec = new ShellUtils.ShellCommandExecutor(commandArray); + LOG.debug("Setting blob permissions, command: {}", Arrays.toString(commandArray)); + + try { + shExec.execute(); + LOG.debug("output: {}", shExec.getOutput()); + } catch (ShellUtils.ExitCodeException e) { + int exitCode = shExec.getExitCode(); + LOG.warn("Exit code from worker-launcher is: {}", exitCode, e); + LOG.debug("output: {}", shExec.getOutput()); + throw new IOException("Setting blob permissions failed" + + " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e); + } + } + + private Path tmpOutputLocation() { + return baseDir.resolve(Paths.get(LocalizedResource.TO_UNCOMPRESS + getKey())); + } + + private Path tmpSymlinkLocation() { + return baseDir.resolve(Paths.get(LocalizedResource.TO_UNCOMPRESS + getKey() + CURRENT_BLOB_SUFFIX)); + } + + private static final Pattern VERSION_FILE_PATTERN = Pattern.compile("^(.+)\\.(\\d+)$"); + + @Override + public void cleanupOrphanedData() throws IOException { + //There are a few possible files that we would want to clean up + //baseDir + "/" + "_tmp_" + baseName + //baseDir + "/" + "_tmp_" + baseName + ".current" + //baseDir + "/" + baseName.<VERSION> + //baseDir + "/" + baseName.current + //baseDir + "/" + baseName.version + //In general we always want to delete the _tmp_ files if they are there. + + Path tmpOutput = tmpOutputLocation(); + Files.deleteIfExists(tmpOutput); + Path tmpSym = tmpSymlinkLocation(); + Files.deleteIfExists(tmpSym); + + try { + String baseName = getKey(); + long version = getLocalVersion(); + Path current = getCurrentSymlinkPath(); + + //If .current and .version do not match, we roll back the .version file to match + // what .current is pointing to. + if (Files.exists(current) && Files.isSymbolicLink(current)) { + Path versionFile = Files.readSymbolicLink(current); + Matcher m = VERSION_FILE_PATTERN.matcher(versionFile.getFileName().toString()); + if (m.matches()) { + long foundVersion = Long.valueOf(m.group(2)); + if (foundVersion != version) { + LOG.error("{} does not match the version file so fix the version file", current); + //The versions are different so roll back to whatever current is + try (PrintWriter restoreWriter = new PrintWriter( + new BufferedWriter(new FileWriter(versionFilePath.toFile(), false)))) { + restoreWriter.println(foundVersion); + } + version = foundVersion; + } + } + } + + // Finally delete any baseName.<VERSION> files that are not pointed to by the current version + final long finalVersion = version; + LOG.debug("Looking to clean up after {} in {}", getKey(), baseDir); + try (DirectoryStream<Path> ds = fsOps.newDirectoryStream(baseDir, (path) -> { + Matcher m = VERSION_FILE_PATTERN.matcher(path.getFileName().toString()); + if (m.matches()) { + long foundVersion = Long.valueOf(m.group(2)); + return m.group(1).equals(baseName) && foundVersion != finalVersion; + } + return false; + })) { + for (Path p : ds) { + LOG.info("Cleaning up old localized resource file {}", p); + if (Files.isDirectory(p)) { + FileUtils.deleteDirectory(p.toFile()); + } else { + fsOps.deleteIfExists(p.toFile()); + } + } + } + } catch (NoSuchFileException e) { + LOG.warn("Nothing to cleanup with badeDir {} even though we expected there to be something there", baseDir); + } + } + + @Override + public void completelyRemove() throws IOException { + Path fileWithVersion = getFilePathWithVersion(); + Path currentSymLink = getCurrentSymlinkPath(); + + if (uncompressed) { + if (Files.exists(fileWithVersion)) { + // this doesn't follow symlinks, which is what we want + FileUtils.deleteDirectory(fileWithVersion.toFile()); + } + } else { + Files.deleteIfExists(fileWithVersion); + } + Files.deleteIfExists(currentSymLink); + Files.deleteIfExists(versionFilePath); + } + + @Override + public long getSizeOnDisk() { + return size; + } + + @Override + public boolean isFullyDownloaded() { + return Files.exists(getFilePathWithVersion()) + && Files.exists(getCurrentSymlinkPath()) + && Files.exists(versionFilePath); + } + + @Override + public boolean equals(Object other) { + if (other instanceof LocalizedResource) { + LocalizedResource l = (LocalizedResource)other; + return getKey().equals(l.getKey()) && uncompressed == l.uncompressed && baseDir.equals(l.baseDir); + } + return false; + } + + @Override + public int hashCode() { + return getKey().hashCode() + Boolean.hashCode(uncompressed) + baseDir.hashCode(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java index 826bf98..936dbc1 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java @@ -15,23 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.localizer; import com.google.common.annotations.VisibleForTesting; -import java.nio.file.Path; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; import java.util.Comparator; import java.util.Iterator; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.KeyNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A set of resources that we can look at to see which ones we retain and which ones should be @@ -39,24 +37,22 @@ import java.util.TreeMap; */ public class LocalizedResourceRetentionSet { public static final Logger LOG = LoggerFactory.getLogger(LocalizedResourceRetentionSet.class); - private long delSize; private long currentSize; // targetSize in Bytes private long targetSize; @VisibleForTesting - final SortedMap<ComparableResource, CleanableResourceSet> noReferences; - private int resourceCount = 0; + final SortedMap<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>> noReferences; LocalizedResourceRetentionSet(long targetSize) { this(targetSize, new LRUComparator()); } - LocalizedResourceRetentionSet(long targetSize, Comparator<? super ComparableResource> cmp) { + LocalizedResourceRetentionSet(long targetSize, Comparator<? super LocallyCachedBlob> cmp) { this(targetSize, new TreeMap<>(cmp)); } LocalizedResourceRetentionSet(long targetSize, - SortedMap<ComparableResource, CleanableResourceSet> retain) { + SortedMap<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>> retain) { this.noReferences = retain; this.targetSize = targetSize; } @@ -66,287 +62,88 @@ public class LocalizedResourceRetentionSet { return noReferences.size(); } - protected void addResourcesForSet(Iterator<LocalizedResource> setIter, LocalizedResourceSet set) { - CleanableLocalizedResourceSet cleanset = new CleanableLocalizedResourceSet(set); - for (Iterator<LocalizedResource> iter = setIter; setIter.hasNext(); ) { - LocalizedResource lrsrc = iter.next(); - currentSize += lrsrc.getSize(); - resourceCount ++; - if (lrsrc.getRefCount() > 0) { - // always retain resources in use - continue; - } - noReferences.put(new LocalizedBlobComparableResource(lrsrc), cleanset); - } - } - - public void addResources(LocalizedResourceSet set) { - addResourcesForSet(set.getLocalFilesIterator(), set); - addResourcesForSet(set.getLocalArchivesIterator(), set); - } - - public void addResources(ConcurrentHashMap<String, LocallyCachedBlob> blobs) { - CleanableLocalizedLocallyCachedBlob set = new CleanableLocalizedLocallyCachedBlob(blobs); + /** + * Add blobs to be checked if they can be deleted. + * @param blobs a map of blob name to the blob object. The blobs in this map will be deleted from the map + * if they are deleted on disk too. + */ + public void addResources(ConcurrentMap<String, ? extends LocallyCachedBlob> blobs) { for (LocallyCachedBlob b: blobs.values()) { currentSize += b.getSizeOnDisk(); - resourceCount ++; if (b.isUsed()) { + LOG.debug("NOT going to clean up {}, {} depends on it", b.getKey(), b.getDependencies()); // always retain resources in use continue; } - LocallyCachedBlobComparableResource cb = new LocallyCachedBlobComparableResource(b); - noReferences.put(cb, set); + LOG.debug("Possibly going to clean up {} ts {} size {}", b.getKey(), b.getLastUsed(), b.getSizeOnDisk()); + noReferences.put(b, blobs); } } - public void cleanup() { + /** + * Actually cleanup the blobs to try and get below the target cache size. + * @param store the blobs store client used to check if the blob has been deleted from the blobstore. If it has, the blob will be + * deleted even if the cache is not over the target size. + */ + public void cleanup(ClientBlobStore store) { LOG.debug("cleanup target size: {} current size is: {}", targetSize, currentSize); - for (Iterator<Map.Entry<ComparableResource, CleanableResourceSet>> i = - noReferences.entrySet().iterator(); - currentSize - delSize > targetSize && i.hasNext();) { - Map.Entry<ComparableResource, CleanableResourceSet> rsrc = i.next(); - ComparableResource resource = rsrc.getKey(); - CleanableResourceSet set = rsrc.getValue(); - if (resource != null && set.remove(resource)) { - if (set.deleteUnderlyingResource(resource)) { - delSize += resource.getSize(); - LOG.info("deleting: {} with size of: {}", resource.getNameForDebug(), resource.getSize()); + long bytesOver = currentSize - targetSize; + //First delete everything that no longer exists... + for (Iterator<Map.Entry<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>>> i = noReferences.entrySet().iterator(); + i.hasNext();) { + Map.Entry<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>> rsrc = i.next(); + LocallyCachedBlob resource = rsrc.getKey(); + try { + resource.getRemoteVersion(store); + } catch (AuthorizationException e) { + //Ignored + } catch (KeyNotFoundException e) { + //The key was removed so we should delete it too. + Map<String, ? extends LocallyCachedBlob> set = rsrc.getValue(); + if (removeBlob(resource, set)) { + bytesOver -= resource.getSizeOnDisk(); + LOG.info("Deleted blob: {} (KEY NOT FOUND).", resource.getKey()); i.remove(); - } else { - // since it failed to delete add it back so it gets retried - set.add(resource.getKey(), resource); } } } - } - - @VisibleForTesting - public boolean deleteResource(CleanableResourceSet set, ComparableResource resource) { - return set.deleteUnderlyingResource(resource); - } - - public long getCurrentSize() { - return currentSize; - } - - public int getResourceCount() { - return resourceCount; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("Cache: ").append(currentSize).append(", "); - sb.append("Deleted: ").append(delSize); - return sb.toString(); - } - - interface ComparableResource { - long getLastAccessTime(); - - long getSize(); - - String getNameForDebug(); - - String getKey(); - } - - interface CleanableResourceSet { - boolean remove(ComparableResource resource); - - void add(String key, ComparableResource resource); - - boolean deleteUnderlyingResource(ComparableResource resource); - } - - public static class LocallyCachedBlobComparableResource implements ComparableResource { - private final LocallyCachedBlob blob; - - public LocallyCachedBlobComparableResource(LocallyCachedBlob blob) { - this.blob = blob; - } - - @Override - public long getLastAccessTime() { - return blob.getLastUsed(); - } - - @Override - public long getSize() { - return blob.getSizeOnDisk(); - } - - @Override - public String getNameForDebug() { - return blob.getKey(); - } - - @Override - public String getKey() { - return blob.getKey(); - } - - @Override - public String toString() { - return blob.toString(); - } - @Override - public boolean equals(Object other) { - if (other instanceof LocallyCachedBlobComparableResource) { - return blob.equals(((LocallyCachedBlobComparableResource) other).blob); + for (Iterator<Map.Entry<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>>> i = noReferences.entrySet().iterator(); + bytesOver > 0 && i.hasNext();) { + Map.Entry<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>> rsrc = i.next(); + LocallyCachedBlob resource = rsrc.getKey(); + Map<String, ? extends LocallyCachedBlob> set = rsrc.getValue(); + if (removeBlob(resource, set)) { + bytesOver -= resource.getSizeOnDisk(); + LOG.info("Deleted blob: {} (OVER SIZE LIMIT).", resource.getKey()); + i.remove(); } - return false; - } - - @Override - public int hashCode() { - return blob.hashCode(); } } - private static class CleanableLocalizedLocallyCachedBlob implements CleanableResourceSet { - private final ConcurrentHashMap<String, LocallyCachedBlob> blobs; - - public CleanableLocalizedLocallyCachedBlob(ConcurrentHashMap<String, LocallyCachedBlob> blobs) { - this.blobs = blobs; - } - - @Override - public boolean remove(ComparableResource resource) { - if (!(resource instanceof LocallyCachedBlobComparableResource)) { - throw new IllegalStateException(resource + " must be a LocallyCachedBlobComparableResource"); - } - LocallyCachedBlob blob = ((LocallyCachedBlobComparableResource)resource).blob; - synchronized (blob) { - if (!blob.isUsed()) { - try { - blob.completelyRemove(); - } catch (Exception e) { - LOG.warn("Tried to remove {} but failed with", blob, e); - } - blobs.remove(blob.getKey()); - return true; + private boolean removeBlob(LocallyCachedBlob blob, Map<String, ? extends LocallyCachedBlob> blobs) { + synchronized (blob) { + if (!blob.isUsed()) { + try { + blob.completelyRemove(); + } catch (Exception e) { + LOG.warn("Tried to remove {} but failed with", blob, e); } - return false; - } - } - - @Override - public void add(String key, ComparableResource resource) { - ///NOOP not used - } - - @Override - public boolean deleteUnderlyingResource(ComparableResource resource) { - //NOOP not used - return true; - } - } - - private static class LocalizedBlobComparableResource implements ComparableResource { - private final LocalizedResource resource; - - private LocalizedBlobComparableResource(LocalizedResource resource) { - this.resource = resource; - } - - @Override - public long getLastAccessTime() { - return resource.getLastAccessTime(); - } - - @Override - public long getSize() { - return resource.getSize(); - } - - @Override - public String getNameForDebug() { - return resource.getFilePath(); - } - - @Override - public String getKey() { - return resource.getKey(); - } - - @Override - public String toString() { - return resource.getKey() + " at " + resource.getFilePathWithVersion(); - } - - @Override - public boolean equals(Object other) { - if (other instanceof LocalizedBlobComparableResource) { - return resource.equals(((LocalizedBlobComparableResource) other).resource); + blobs.remove(blob.getKey()); + return true; } return false; } - - @Override - public int hashCode() { - return resource.hashCode(); - } } - private static class CleanableLocalizedResourceSet implements CleanableResourceSet { - private final LocalizedResourceSet set; - - public CleanableLocalizedResourceSet(LocalizedResourceSet set) { - this.set = set; - } - - @Override - public boolean remove(ComparableResource resource) { - if (!(resource instanceof LocalizedBlobComparableResource)) { - throw new IllegalStateException(resource + " must be a LocalizedBlobComparableResource"); - } - return set.remove(((LocalizedBlobComparableResource)resource).resource); - } - - @Override - public void add(String key, ComparableResource resource) { - if (!(resource instanceof LocalizedBlobComparableResource)) { - throw new IllegalStateException(resource + " must be a LocalizedBlobComparableResource"); - } - LocalizedResource r = ((LocalizedBlobComparableResource)resource).resource; - set.add(key, r, r.isUncompressed()); - } - - @Override - public boolean deleteUnderlyingResource(ComparableResource resource) { - if (resource instanceof LocalizedBlobComparableResource) { - LocalizedResource lr = ((LocalizedBlobComparableResource) resource).resource; - try { - Path fileWithVersion = new File(lr.getFilePathWithVersion()).toPath(); - Path currentSymLink = new File(lr.getCurrentSymlinkPath()).toPath(); - Path versionFile = new File(lr.getVersionFilePath()).toPath(); - - if (lr.isUncompressed()) { - if (Files.exists(fileWithVersion)) { - // this doesn't follow symlinks, which is what we want - FileUtils.deleteDirectory(fileWithVersion.toFile()); - } - } else { - Files.deleteIfExists(fileWithVersion); - } - Files.deleteIfExists(currentSymLink); - Files.deleteIfExists(versionFile); - return true; - } catch (IOException e) { - LOG.warn("Could not delete: {}", resource.getNameForDebug(), e); - } - return false; - } else { - throw new IllegalArgumentException("Don't know how to handle a " + resource.getClass()); - } - } + @Override + public String toString() { + return "Cache: " + currentSize; } - static class LRUComparator implements Comparator<ComparableResource> { - public int compare(ComparableResource r1, ComparableResource r2) { - long ret = r1.getLastAccessTime() - r2.getLastAccessTime(); + static class LRUComparator implements Comparator<LocallyCachedBlob> { + public int compare(LocallyCachedBlob r1, LocallyCachedBlob r2) { + long ret = r1.getLastUsed() - r2.getLastUsed(); if (0 == ret) { return System.identityHashCode(r1) - System.identityHashCode(r2); } http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceSet.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceSet.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceSet.java deleted file mode 100644 index 62d5b2f..0000000 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceSet.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.localizer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Iterator; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Set of localized resources for a specific user. - */ -public class LocalizedResourceSet { - - public static final Logger LOG = LoggerFactory.getLogger(LocalizedResourceSet.class); - // Key to LocalizedResource mapping for files - private final ConcurrentMap<String, LocalizedResource> _localrsrcFiles; - // Key to LocalizedResource mapping for files to be uncompressed - private final ConcurrentMap<String, LocalizedResource> _localrsrcArchives; - private String _user; - - LocalizedResourceSet(String user) { - this._localrsrcFiles = new ConcurrentHashMap<String, LocalizedResource>(); - this._localrsrcArchives = new ConcurrentHashMap<String, LocalizedResource>(); - _user = user; - } - - public String getUser() { - return _user; - } - - public int getSize() { - return _localrsrcFiles.size() + _localrsrcArchives.size(); - } - - public LocalizedResource get(String name, boolean uncompress) { - if (uncompress) { - return _localrsrcArchives.get(name); - } - return _localrsrcFiles.get(name); - } - - public void putIfAbsent(String resourceName, LocalizedResource updatedResource, - boolean uncompress) { - if (uncompress) { - _localrsrcArchives.putIfAbsent(resourceName, updatedResource); - } else { - _localrsrcFiles.putIfAbsent(resourceName, updatedResource); - } - } - - public void add(String resourceName, LocalizedResource newResource, boolean uncompress) { - if (uncompress) { - _localrsrcArchives.put(resourceName, newResource); - } else { - _localrsrcFiles.put(resourceName, newResource); - } - } - - public boolean exists(String resourceName, boolean uncompress) { - if (uncompress) { - return _localrsrcArchives.containsKey(resourceName); - } - return _localrsrcFiles.containsKey(resourceName); - } - - public boolean remove(LocalizedResource resource) { - LocalizedResource lrsrc = null; - if (resource.isUncompressed()) { - lrsrc = _localrsrcArchives.remove(resource.getKey()); - } else { - lrsrc = _localrsrcFiles.remove(resource.getKey()); - } - return (lrsrc != null); - } - - public Iterator<LocalizedResource> getLocalFilesIterator() { - return _localrsrcFiles.values().iterator(); - } - - public Iterator<LocalizedResource> getLocalArchivesIterator() { - return _localrsrcArchives.values().iterator(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java index a287e95..1f7ee00 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java @@ -19,18 +19,24 @@ package org.apache.storm.localizer; import java.io.IOException; +import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.LinkOption; import java.nio.file.Path; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.apache.storm.blobstore.BlobStore; import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.blobstore.InputStreamWithMeta; +import org.apache.storm.daemon.supervisor.IAdvancedFSOps; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.KeyNotFoundException; +import org.apache.storm.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,9 +47,10 @@ public abstract class LocallyCachedBlob { private static final Logger LOG = LoggerFactory.getLogger(LocallyCachedBlob.class); public static final long NOT_DOWNLOADED_VERSION = -1; // A callback that does nothing. - private static final BlobChangingCallback NOOP_CB = (assignment, port, resource, go) -> {}; + private static final BlobChangingCallback NOOP_CB = (assignment, port, resource, go) -> { + }; - private long lastUsed = System.currentTimeMillis(); + private long lastUsed = Time.currentTimeMillis(); private final Map<PortAndAssignment, BlobChangingCallback> references = new HashMap<>(); private final String blobDescription; private final String blobKey; @@ -51,8 +58,9 @@ public abstract class LocallyCachedBlob { /** * Create a new LocallyCachedBlob. + * * @param blobDescription a description of the blob this represents. Typically it should at least be the blob key, but ideally also - * include if it is an archive or not, what user or topology it is for, or if it is a storm.jar etc. + * include if it is an archive or not, what user or topology it is for, or if it is a storm.jar etc. */ protected LocallyCachedBlob(String blobDescription, String blobKey) { this.blobDescription = blobDescription; @@ -60,26 +68,54 @@ public abstract class LocallyCachedBlob { } /** - * Get the version of the blob cached locally. If the version is unknown or it has not been downloaded NOT_DOWNLOADED_VERSION - * should be returned. - * PRECONDITION: this can only be called with a lock on this instance held. + * Get the version of the blob cached locally. If the version is unknown or it has not been downloaded NOT_DOWNLOADED_VERSION should be + * returned. PRECONDITION: this can only be called with a lock on this instance held. */ public abstract long getLocalVersion(); /** - * Get the version of the blob in the blob store. - * PRECONDITION: this can only be called with a lock on this instance held. + * Get the version of the blob in the blob store. PRECONDITION: this can only be called with a lock on this instance held. */ public abstract long getRemoteVersion(ClientBlobStore store) throws KeyNotFoundException, AuthorizationException; /** * Download the latest version to a temp location. This may also include unzipping some or all of the data to a temp location. * PRECONDITION: this can only be called with a lock on this instance held. + * * @param store the store to us to download the data. * @return the version that was downloaded. */ public abstract long downloadToTempLocation(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException; + protected static long downloadToTempLocation(ClientBlobStore store, String key, long currentVersion, IAdvancedFSOps fsOps, + Function<Long, Path> getTempPath) + throws KeyNotFoundException, AuthorizationException, IOException { + try (InputStreamWithMeta in = store.getBlob(key)) { + long newVersion = in.getVersion(); + if (newVersion == currentVersion) { + LOG.warn("The version did not change, but going to download again {} {}", currentVersion, key); + } + Path tmpLocation = getTempPath.apply(newVersion); + long totalRead = 0; + //Make sure the parent directory is there and ready to go + fsOps.forceMkdir(tmpLocation.getParent()); + try (OutputStream outStream = fsOps.getOutputStream(tmpLocation.toFile())) { + byte[] buffer = new byte[4096]; + int read = 0; + while ((read = in.read(buffer)) > 0) { + outStream.write(buffer, 0, read); + totalRead += read; + } + } + long expectedSize = in.getFileLength(); + if (totalRead != expectedSize) { + throw new IOException("We expected to download " + expectedSize + " bytes but found we got " + totalRead); + } + + return newVersion; + } + } + /** * Commit the new version and make it available for the end user. * PRECONDITION: uncompressToTempLocationIfNeeded will have been called. @@ -110,32 +146,11 @@ public abstract class LocallyCachedBlob { public abstract long getSizeOnDisk(); /** - * Updates the last updated time. This should be called when references are added or removed. - */ - private synchronized void touch() { - lastUsed = System.currentTimeMillis(); - } - - /** - * Get the last time that this used for LRU calculations. - */ - public synchronized long getLastUsed() { - return lastUsed; - } - - /** - * Return true if this blob is actively being used, else false (meaning it can be deleted, but might not be). - */ - public synchronized boolean isUsed() { - return !references.isEmpty(); - } - - /** * Get the size of p in bytes. * @param p the path to read. * @return the size of p in bytes. */ - protected long getSizeOnDisk(Path p) throws IOException { + protected static long getSizeOnDisk(Path p) throws IOException { if (!Files.exists(p)) { return 0; } else if (Files.isRegularFile(p)) { @@ -156,6 +171,28 @@ public abstract class LocallyCachedBlob { } /** + * Updates the last updated time. This should be called when references are added or removed. + */ + protected synchronized void touch() { + lastUsed = Time.currentTimeMillis(); + LOG.debug("Setting {} ts to {}", blobKey, lastUsed); + } + + /** + * Get the last time that this used for LRU calculations. + */ + public synchronized long getLastUsed() { + return lastUsed; + } + + /** + * Return true if this blob is actively being used, else false (meaning it can be deleted, but might not be). + */ + public synchronized boolean isUsed() { + return !references.isEmpty(); + } + + /** * Mark that a given port and assignment are using this. * @param pna the slot and assignment that are using this blob. * @param cb an optional callback indicating that they want to know/synchronize when a blob is updated. @@ -177,6 +214,7 @@ public abstract class LocallyCachedBlob { if (references.remove(pna) == null) { LOG.warn("{} had no reservation for {}", pna, blobDescription); } + touch(); } /** @@ -217,4 +255,11 @@ public abstract class LocallyCachedBlob { public String getKey() { return blobKey; } + + + public Collection<PortAndAssignment> getDependencies() { + return references.keySet(); + } + + public abstract boolean isFullyDownloaded(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java index 35371b5..68415e1 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java @@ -115,14 +115,13 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { public String getTempExtractionDir(long version) { return extractionDir + "." + version; } - }; + } private final TopologyBlobType type; private final String topologyId; private final boolean isLocalMode; private final Path topologyBasicBlobsRootDir; private final AdvancedFSOps fsOps; - private final Map<String, Object> conf; private volatile long version = NOT_DOWNLOADED_VERSION; private volatile long size = 0; @@ -139,7 +138,6 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { this.type = type; this.isLocalMode = isLocalMode; this.fsOps = fsOps; - this.conf = conf; topologyBasicBlobsRootDir = Paths.get(ConfigUtils.supervisorStormDistRoot(conf, topologyId)); readVersion(); updateSizeOnDisk(); @@ -203,31 +201,11 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { return LOCAL_MODE_JAR_VERSION; } - long newVersion; - Path tmpLocation; - String key = type.getKey(topologyId); - try (InputStreamWithMeta in = store.getBlob(key)) { - newVersion = in.getVersion(); - long expectedSize = in.getFileLength(); - if (newVersion == version) { - throw new RuntimeException("The version did not change, but we tried to download it. " + version + " " + key); - } - tmpLocation = topologyBasicBlobsRootDir.resolve(type.getTempFileName(newVersion)); - long totalRead = 0; - //Make sure the parent directory is there and ready to go - fsOps.forceMkdir(tmpLocation.getParent()); - try (OutputStream outStream = fsOps.getOutputStream(tmpLocation.toFile())) { - byte [] buffer = new byte[4096]; - int read = 0; - while ((read = in.read(buffer)) > 0) { - outStream.write(buffer, 0, read); - totalRead += read; - } - } - if (totalRead != expectedSize) { - throw new IOException("We expected to download " + expectedSize + " bytes but found we got " + totalRead); - } - } + + long newVersion = downloadToTempLocation(store, type.getKey(topologyId), version, fsOps, + (version) -> topologyBasicBlobsRootDir.resolve(type.getTempFileName(version))); + + Path tmpLocation = topologyBasicBlobsRootDir.resolve(type.getTempFileName(newVersion)); if (type.needsExtraction()) { Path extractionDest = topologyBasicBlobsRootDir.resolve(type.getTempExtractionDir(newVersion)); @@ -247,10 +225,10 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { String name = entry.getName(); if (!entry.isDirectory() && name.startsWith(toRemove)) { String shortenedName = name.replace(toRemove, ""); - Path aFile = dest.resolve(shortenedName); - LOG.debug("EXTRACTING {} SHORTENED to {} into {}", name, shortenedName, aFile); - fsOps.forceMkdir(aFile.getParent()); - try (FileOutputStream out = new FileOutputStream(aFile.toFile()); + Path targetFile = dest.resolve(shortenedName); + LOG.debug("EXTRACTING {} SHORTENED to {} into {}", name, shortenedName, targetFile); + fsOps.forceMkdir(targetFile.getParent()); + try (FileOutputStream out = new FileOutputStream(targetFile.toFile()); InputStream in = jarFile.getInputStream(entry)) { IOUtils.copy(in, out); } @@ -260,6 +238,21 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { } @Override + public boolean isFullyDownloaded() { + Path versionFile = topologyBasicBlobsRootDir.resolve(type.getVersionFileName()); + boolean ret = Files.exists(versionFile); + Path dest = topologyBasicBlobsRootDir.resolve(type.getFileName()); + if (!(isLocalMode && type == TopologyBlobType.TOPO_JAR)) { + ret = ret && Files.exists(dest); + } + if (type.needsExtraction()) { + Path extractionDest = topologyBasicBlobsRootDir.resolve(type.getExtractionDir()); + ret = ret && Files.exists(extractionDest); + } + return ret; + } + + @Override public void commitNewVersion(long newVersion) throws IOException { //This is not atomic (so if something bad happens in the middle we need to be able to recover Path tempLoc = topologyBasicBlobsRootDir.resolve(type.getTempFileName(newVersion)); @@ -325,6 +318,7 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { if (type.needsExtraction()) { removeAll(type.getExtractionDir()); } + touch(); } private void removeAll(String baseName) throws IOException { http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java b/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java index 081c811..bd92173 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java @@ -21,7 +21,7 @@ package org.apache.storm.localizer; import org.apache.storm.generated.LocalAssignment; /** - * A Port and a LocalAssignment used to reference count Resources + * A Port and a LocalAssignment used to reference count resources. */ class PortAndAssignment { private final int port; @@ -45,6 +45,10 @@ class PortAndAssignment { return assignment.get_topology_id(); } + public String getOwner() { + return assignment.get_owner(); + } + @Override public int hashCode() { return (17 * port) + assignment.hashCode(); @@ -52,7 +56,7 @@ class PortAndAssignment { @Override public String toString() { - return "{" + port + " " + assignment + "}"; + return "{" + assignment.get_topology_id() + " on " + port + "}"; } /** http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java index 6a4454a..340db1c 100644 --- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java +++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java @@ -82,10 +82,7 @@ public class ServerUtils { public static final Logger LOG = LoggerFactory.getLogger(ServerUtils.class); public static final boolean IS_ON_WINDOWS = "Windows_NT".equals(System.getenv("OS")); - public static final String CURRENT_BLOB_SUFFIX_ID = "current"; - public static final String DEFAULT_CURRENT_BLOB_SUFFIX = "." + CURRENT_BLOB_SUFFIX_ID; - public static final String DEFAULT_BLOB_VERSION_SUFFIX = ".version"; public static final int SIGKILL = 9; public static final int SIGTERM = 15; @@ -166,14 +163,6 @@ public class ServerUtils { return StringUtils.join(changedCommands, " "); } - public static String constructVersionFileName(String fileName) { - return fileName + DEFAULT_BLOB_VERSION_SUFFIX; - } - - public static String constructBlobCurrentSymlinkName(String fileName) { - return fileName + DEFAULT_CURRENT_BLOB_SUFFIX; - } - /** * Takes an input dir or file and returns the disk usage on that local directory. * Very basic implementation. @@ -206,14 +195,6 @@ public class ServerUtils { } } - public static long localVersionOfBlob(String localFile) { - return Utils.getVersionFromBlobVersionFile(new File(localFile + DEFAULT_BLOB_VERSION_SUFFIX)); - } - - public static String constructBlobWithVersionFileName(String fileName, long version) { - return fileName + "." + version; - } - public static ClientBlobStore getClientBlobStoreForSupervisor(Map<String, Object> conf) { ClientBlobStore store; if (ConfigUtils.isLocalMode(conf)) { http://git-wip-us.apache.org/repos/asf/storm/blob/7afd6fbe/storm-server/src/test/java/org/apache/storm/TestingTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/TestingTest.java b/storm-server/src/test/java/org/apache/storm/TestingTest.java new file mode 100644 index 0000000..201b470 --- /dev/null +++ b/storm-server/src/test/java/org/apache/storm/TestingTest.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.storm.testing.CompleteTopologyParam; +import org.apache.storm.testing.FixedTuple; +import org.apache.storm.testing.IntegrationTest; +import org.apache.storm.testing.MkClusterParam; +import org.apache.storm.testing.MockedSources; +import org.apache.storm.testing.TestAggregatesCounter; +import org.apache.storm.testing.TestGlobalCount; +import org.apache.storm.testing.TestJob; +import org.apache.storm.testing.TestWordCounter; +import org.apache.storm.testing.TestWordSpout; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.*; + +/** + * Test that the testing class does what it should do. + */ +public class TestingTest { + + private static final TestJob COMPLETE_TOPOLOGY_TESTJOB = (cluster) -> { + TopologyBuilder tb = new TopologyBuilder(); + tb.setSpout("spout", new TestWordSpout(true), 3); + tb.setBolt("2", new TestWordCounter(), 4) + .fieldsGrouping("spout", new Fields("word")); + tb.setBolt("3", new TestGlobalCount()) + .globalGrouping("spout"); + tb.setBolt("4", new TestAggregatesCounter()) + .globalGrouping("2"); + + MockedSources mocked = new MockedSources(); + mocked.addMockData("spout", + new Values("nathan"), + new Values("bob"), + new Values("joey"), + new Values("nathan")); + + Config topoConf = new Config(); + topoConf.setNumWorkers(2); + + CompleteTopologyParam ctp = new CompleteTopologyParam(); + ctp.setMockedSources(mocked); + ctp.setStormConf(topoConf); + + Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, tb.createTopology(), ctp); + List<List<Object>> spoutTuples = Testing.readTuples(results, "spout"); + List<List<Object>> expectedSpoutTuples = Arrays.asList(Arrays.asList("nathan"), Arrays.asList("bob"), Arrays.asList("joey"), + Arrays.asList("nathan")); + assertTrue(expectedSpoutTuples + " expected, but found " + spoutTuples, + Testing.multiseteq(expectedSpoutTuples, spoutTuples)); + + List<List<Object>> twoTuples = Testing.readTuples(results, "2"); + List<List<Object>> expectedTwoTuples = Arrays.asList(Arrays.asList("nathan", 1), Arrays.asList("nathan", 2), + Arrays.asList("bob", 1), Arrays.asList("joey", 1)); + assertTrue(expectedTwoTuples + " expected, but found " + twoTuples, + Testing.multiseteq(expectedTwoTuples, twoTuples)); + + List<List<Object>> threeTuples = Testing.readTuples(results, "3"); + List<List<Object>> expectedThreeTuples = Arrays.asList(Arrays.asList(1), Arrays.asList(2), + Arrays.asList(3), Arrays.asList(4)); + assertTrue(expectedThreeTuples + " expected, but found " + threeTuples, + Testing.multiseteq(expectedThreeTuples, threeTuples)); + + List<List<Object>> fourTuples = Testing.readTuples(results, "4"); + List<List<Object>> expectedFourTuples = Arrays.asList(Arrays.asList(1), Arrays.asList(2), + Arrays.asList(3), Arrays.asList(4)); + assertTrue(expectedFourTuples + " expected, but found " + fourTuples, + Testing.multiseteq(expectedFourTuples, fourTuples)); + }; + + @Test + @Category(IntegrationTest.class) + public void testCompleteTopologyNettySimulated() throws Exception { + Config daemonConf = new Config(); + daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true); + MkClusterParam param = new MkClusterParam(); + param.setSupervisors(4); + param.setDaemonConf(daemonConf); + + Testing.withSimulatedTimeLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB); + } + + @Test + @Category(IntegrationTest.class) + public void testCompleteTopologyNetty() throws Exception { + Config daemonConf = new Config(); + daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true); + MkClusterParam param = new MkClusterParam(); + param.setSupervisors(4); + param.setDaemonConf(daemonConf); + + Testing.withLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB); + } + + @Test + @Category(IntegrationTest.class) + public void testCompleteTopologyLocalSimulated() throws Exception { + MkClusterParam param = new MkClusterParam(); + param.setSupervisors(4); + + Testing.withSimulatedTimeLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB); + } + + @Test + @Category(IntegrationTest.class) + public void testCompleteTopologyLocal() throws Exception { + MkClusterParam param = new MkClusterParam(); + param.setSupervisors(4); + + Testing.withLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB); + } + +} \ No newline at end of file