http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java b/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java deleted file mode 100644 index 353ab56..0000000 --- a/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java +++ /dev/null @@ -1,695 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.localizer; - -import org.apache.storm.Config; -import org.apache.storm.DaemonConfig; -import org.apache.storm.blobstore.ClientBlobStore; -import org.apache.storm.blobstore.InputStreamWithMeta; -import org.apache.storm.generated.AuthorizationException; -import org.apache.storm.generated.KeyNotFoundException; -import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.ServerUtils; -import org.apache.storm.utils.ObjectReader; -import org.apache.storm.utils.ShellUtils.ExitCodeException; -import org.apache.storm.utils.ShellUtils.ShellCommandExecutor; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileWriter; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.PrintWriter; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Collection; -import java.util.ArrayList; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; - -/** - * Class to download and manage files from the blobstore. It uses an LRU cache - * to determine which files to keep so they can be reused and which files to delete. - */ -public class Localizer { - public static final Logger LOG = LoggerFactory.getLogger(Localizer.class); - public static final String FILECACHE = "filecache"; - public static final String USERCACHE = "usercache"; - // sub directories to store either files or uncompressed archives respectively - public static final String FILESDIR = "files"; - public static final String ARCHIVESDIR = "archives"; - - private static final String TO_UNCOMPRESS = "_tmp_"; - - - - private final Map<String, Object> _conf; - private final int _threadPoolSize; - // thread pool for initial download - private final ExecutorService _execService; - // thread pool for updates - private final ExecutorService _updateExecService; - private final int _blobDownloadRetries; - - // track resources - user to resourceSet - private final ConcurrentMap<String, LocalizedResourceSet> _userRsrc = new - ConcurrentHashMap<String, LocalizedResourceSet>(); - - private final String _localBaseDir; - - // cleanup - private long _cacheTargetSize; - private long _cacheCleanupPeriod; - private ScheduledExecutorService _cacheCleanupService; - - public Localizer(Map<String, Object> conf, String baseDir) { - _conf = conf; - _localBaseDir = baseDir; - // default cache size 10GB, converted to Bytes - _cacheTargetSize = ObjectReader.getInt(_conf.get(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB), - 10 * 1024).longValue() << 20; - // default 10 minutes. - _cacheCleanupPeriod = ObjectReader.getInt(_conf.get( - DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 10 * 60 * 1000).longValue(); - - // if we needed we could make config for update thread pool size - _threadPoolSize = ObjectReader.getInt(_conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5); - _blobDownloadRetries = ObjectReader.getInt(_conf.get( - DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3); - - _execService = Executors.newFixedThreadPool(_threadPoolSize); - _updateExecService = Executors.newFixedThreadPool(_threadPoolSize); - reconstructLocalizedResources(); - } - - // For testing, it allows setting size in bytes - protected void setTargetCacheSize(long size) { - _cacheTargetSize = size; - } - - // For testing, be careful as it doesn't clone - ConcurrentMap<String, LocalizedResourceSet> getUserResources() { - return _userRsrc; - } - - public void startCleaner() { - _cacheCleanupService = new ScheduledThreadPoolExecutor(1, - new ThreadFactoryBuilder() - .setNameFormat("Localizer Cache Cleanup") - .build()); - - _cacheCleanupService.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - handleCacheCleanup(); - } - }, _cacheCleanupPeriod, _cacheCleanupPeriod, TimeUnit.MILLISECONDS); - } - - public void shutdown() { - if (_cacheCleanupService != null) { - _cacheCleanupService.shutdown(); - } - if (_execService != null) { - _execService.shutdown(); - } - if (_updateExecService != null) { - _updateExecService.shutdown(); - } - } - - // baseDir/supervisor/usercache/ - protected File getUserCacheDir() { - return new File(_localBaseDir, USERCACHE); - } - - // baseDir/supervisor/usercache/user1/ - protected File getLocalUserDir(String userName) { - return new File(getUserCacheDir(), userName); - } - - // baseDir/supervisor/usercache/user1/filecache - public File getLocalUserFileCacheDir(String userName) { - return new File(getLocalUserDir(userName), FILECACHE); - } - - // baseDir/supervisor/usercache/user1/filecache/files - protected File getCacheDirForFiles(File dir) { - return new File(dir, FILESDIR); - } - - // get the directory to put uncompressed archives in - // baseDir/supervisor/usercache/user1/filecache/archives - protected File getCacheDirForArchives(File dir) { - return new File(dir, ARCHIVESDIR); - } - - protected void addLocalizedResourceInDir(String dir, LocalizedResourceSet lrsrcSet, - boolean uncompress) { - File[] lrsrcs = readCurrentBlobs(dir); - - if (lrsrcs != null) { - for (File rsrc : lrsrcs) { - LOG.info("add localized in dir found: " + rsrc); - /// strip off .suffix - String path = rsrc.getPath(); - int p = path.lastIndexOf('.'); - if (p > 0) { - path = path.substring(0, p); - } - LOG.debug("local file is: {} path is: {}", rsrc.getPath(), path); - LocalizedResource lrsrc = new LocalizedResource(new File(path).getName(), path, - uncompress); - lrsrcSet.add(lrsrc.getKey(), lrsrc, uncompress); - } - } - } - - // Looks for files in the directory with .current suffix - protected File[] readCurrentBlobs(String location) { - File dir = new File(location); - File[] files = null; - if (dir.exists()) { - files = dir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.toLowerCase().endsWith(ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - } - }); - } - return files; - } - - // Check to see if there are any existing files already localized. - protected void reconstructLocalizedResources() { - try { - LOG.info("Reconstruct localized resource: " + getUserCacheDir().getPath()); - Collection<File> users = ConfigUtils.readDirFiles(getUserCacheDir().getPath()); - if (!(users == null || users.isEmpty())) { - for (File userDir : users) { - String user = userDir.getName(); - LOG.debug("looking in: {} for user: {}", userDir.getPath(), user); - LocalizedResourceSet newSet = new LocalizedResourceSet(user); - LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet); - if (lrsrcSet == null) { - lrsrcSet = newSet; - } - addLocalizedResourceInDir(getCacheDirForFiles(getLocalUserFileCacheDir(user)).getPath(), - lrsrcSet, false); - addLocalizedResourceInDir( - getCacheDirForArchives(getLocalUserFileCacheDir(user)).getPath(), - lrsrcSet, true); - } - } else { - LOG.warn("No left over resources found for any user during reconstructing of local resources at: {}", getUserCacheDir().getPath()); - } - } catch (Exception e) { - LOG.error("ERROR reconstructing localized resources", e); - } - } - - // ignores invalid user/topo/key - public synchronized void removeBlobReference(String key, String user, String topo, - boolean uncompress) throws AuthorizationException, KeyNotFoundException { - LocalizedResourceSet lrsrcSet = _userRsrc.get(user); - if (lrsrcSet != null) { - LocalizedResource lrsrc = lrsrcSet.get(key, uncompress); - if (lrsrc != null) { - LOG.debug("removing blob reference to: {} for topo: {}", key, topo); - lrsrc.removeReference(topo); - } else { - LOG.warn("trying to remove non-existent blob, key: " + key + " for user: " + user + - " topo: " + topo); - } - } else { - LOG.warn("trying to remove blob for non-existent resource set for user: " + user + " key: " - + key + " topo: " + topo); - } - } - - public synchronized void addReferences(List<LocalResource> localresource, String user, - String topo) { - LocalizedResourceSet lrsrcSet = _userRsrc.get(user); - if (lrsrcSet != null) { - for (LocalResource blob : localresource) { - LocalizedResource lrsrc = lrsrcSet.get(blob.getBlobName(), blob.shouldUncompress()); - if (lrsrc != null) { - lrsrc.addReference(topo); - LOG.debug("added reference for topo: {} key: {}", topo, blob); - } else { - LOG.warn("trying to add reference to non-existent blob, key: " + blob + " topo: " + topo); - } - } - } else { - LOG.warn("trying to add reference to non-existent local resource set, " + - "user: " + user + " topo: " + topo); - } - } - - /** - * This function either returns the blob in the existing cache or if it doesn't exist in the - * cache, it will download the blob and will block until the download is complete. - */ - public LocalizedResource getBlob(LocalResource localResource, String user, String topo, - File userFileDir) throws AuthorizationException, KeyNotFoundException, IOException { - ArrayList<LocalResource> arr = new ArrayList<LocalResource>(); - arr.add(localResource); - List<LocalizedResource> results = getBlobs(arr, user, topo, userFileDir); - if (results.isEmpty() || results.size() != 1) { - throw new IOException("Unknown error getting blob: " + localResource + ", for user: " + user + - ", topo: " + topo); - } - return results.get(0); - } - - protected boolean isLocalizedResourceDownloaded(LocalizedResource lrsrc) { - File rsrcFileCurrent = new File(lrsrc.getCurrentSymlinkPath()); - File rsrcFileWithVersion = new File(lrsrc.getFilePathWithVersion()); - File versionFile = new File(lrsrc.getVersionFilePath()); - return (rsrcFileWithVersion.exists() && rsrcFileCurrent.exists() && versionFile.exists()); - } - - protected boolean isLocalizedResourceUpToDate(LocalizedResource lrsrc, - ClientBlobStore blobstore) throws AuthorizationException, KeyNotFoundException { - String localFile = lrsrc.getFilePath(); - long nimbusBlobVersion = ServerUtils.nimbusVersionOfBlob(lrsrc.getKey(), blobstore); - long currentBlobVersion = ServerUtils.localVersionOfBlob(localFile); - return (nimbusBlobVersion == currentBlobVersion); - } - - protected ClientBlobStore getClientBlobStore() { - return ServerUtils.getClientBlobStoreForSupervisor(_conf); - } - - /** - * This function updates blobs on the supervisor. It uses a separate thread pool and runs - * asynchronously of the download and delete. - */ - public List<LocalizedResource> updateBlobs(List<LocalResource> localResources, - String user) throws AuthorizationException, KeyNotFoundException, IOException { - LocalizedResourceSet lrsrcSet = _userRsrc.get(user); - ArrayList<LocalizedResource> results = new ArrayList<>(); - ArrayList<Callable<LocalizedResource>> updates = new ArrayList<>(); - - if (lrsrcSet == null) { - // resource set must have been removed - return results; - } - ClientBlobStore blobstore = null; - try { - blobstore = getClientBlobStore(); - for (LocalResource localResource: localResources) { - String key = localResource.getBlobName(); - LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress()); - if (lrsrc == null) { - LOG.warn("blob requested for update doesn't exist: {}", key); - continue; - } else if ((boolean)_conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) { - LOG.warn("symlinks are disabled so blobs cannot be downloaded."); - continue; - } else { - // update it if either the version isn't the latest or if any local blob files are missing - if (!isLocalizedResourceUpToDate(lrsrc, blobstore) || - !isLocalizedResourceDownloaded(lrsrc)) { - LOG.debug("updating blob: {}", key); - updates.add(new DownloadBlob(this, _conf, key, new File(lrsrc.getFilePath()), user, - lrsrc.isUncompressed(), true)); - } - } - } - } finally { - if(blobstore != null) { - blobstore.shutdown(); - } - } - try { - List<Future<LocalizedResource>> futures = _updateExecService.invokeAll(updates); - for (Future<LocalizedResource> futureRsrc : futures) { - try { - LocalizedResource lrsrc = futureRsrc.get(); - // put the resource just in case it was removed at same time by the cleaner - LocalizedResourceSet newSet = new LocalizedResourceSet(user); - LocalizedResourceSet newlrsrcSet = _userRsrc.putIfAbsent(user, newSet); - if (newlrsrcSet == null) { - newlrsrcSet = newSet; - } - newlrsrcSet.putIfAbsent(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed()); - results.add(lrsrc); - } - catch (ExecutionException e) { - LOG.error("Error updating blob: ", e); - if (e.getCause() instanceof AuthorizationException) { - throw (AuthorizationException)e.getCause(); - } - if (e.getCause() instanceof KeyNotFoundException) { - throw (KeyNotFoundException)e.getCause(); - } - } - } - } catch (RejectedExecutionException re) { - LOG.error("Error updating blobs : ", re); - } catch (InterruptedException ie) { - throw new IOException("Interrupted Exception", ie); - } - return results; - } - - /** - * This function either returns the blobs in the existing cache or if they don't exist in the - * cache, it downloads them in parallel (up to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT) - * and will block until all of them have been downloaded - */ - public synchronized List<LocalizedResource> getBlobs(List<LocalResource> localResources, - String user, String topo, File userFileDir) - throws AuthorizationException, KeyNotFoundException, IOException { - if ((boolean)_conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) { - throw new KeyNotFoundException("symlinks are disabled so blobs cannot be downloaded."); - } - LocalizedResourceSet newSet = new LocalizedResourceSet(user); - LocalizedResourceSet lrsrcSet = _userRsrc.putIfAbsent(user, newSet); - if (lrsrcSet == null) { - lrsrcSet = newSet; - } - ArrayList<LocalizedResource> results = new ArrayList<>(); - ArrayList<Callable<LocalizedResource>> downloads = new ArrayList<>(); - - ClientBlobStore blobstore = null; - try { - blobstore = getClientBlobStore(); - for (LocalResource localResource: localResources) { - String key = localResource.getBlobName(); - boolean uncompress = localResource.shouldUncompress(); - LocalizedResource lrsrc = lrsrcSet.get(key, localResource.shouldUncompress()); - boolean isUpdate = false; - if ((lrsrc != null) && (lrsrc.isUncompressed() == localResource.shouldUncompress()) && - (isLocalizedResourceDownloaded(lrsrc))) { - if (isLocalizedResourceUpToDate(lrsrc, blobstore)) { - LOG.debug("blob already exists: {}", key); - lrsrc.addReference(topo); - results.add(lrsrc); - continue; - } - LOG.debug("blob exists but isn't up to date: {}", key); - isUpdate = true; - } - - // go off to blobstore and get it - // assume dir passed in exists and has correct permission - LOG.debug("fetching blob: {}", key); - File downloadDir = getCacheDirForFiles(userFileDir); - File localFile = new File(downloadDir, key); - if (uncompress) { - // for compressed file, download to archives dir - downloadDir = getCacheDirForArchives(userFileDir); - localFile = new File(downloadDir, key); - } - downloadDir.mkdir(); - downloads.add(new DownloadBlob(this, _conf, key, localFile, user, uncompress, - isUpdate)); - } - } finally { - if(blobstore !=null) { - blobstore.shutdown(); - } - } - try { - List<Future<LocalizedResource>> futures = _execService.invokeAll(downloads); - for (Future<LocalizedResource> futureRsrc: futures) { - LocalizedResource lrsrc = futureRsrc.get(); - lrsrc.addReference(topo); - lrsrcSet.add(lrsrc.getKey(), lrsrc, lrsrc.isUncompressed()); - results.add(lrsrc); - } - } catch (ExecutionException e) { - if (e.getCause() instanceof AuthorizationException) - throw (AuthorizationException)e.getCause(); - else if (e.getCause() instanceof KeyNotFoundException) { - throw (KeyNotFoundException)e.getCause(); - } else { - throw new IOException("Error getting blobs", e); - } - } catch (RejectedExecutionException re) { - throw new IOException("RejectedExecutionException: ", re); - } catch (InterruptedException ie) { - throw new IOException("Interrupted Exception", ie); - } - return results; - } - - static class DownloadBlob implements Callable<LocalizedResource> { - - private Localizer _localizer; - private Map _conf; - private String _key; - private File _localFile; - private String _user; - private boolean _uncompress; - private boolean _isUpdate; - - public DownloadBlob(Localizer localizer, Map<String, Object> conf, String key, File localFile, - String user, boolean uncompress, boolean update) { - _localizer = localizer; - _conf = conf; - _key = key; - _localFile = localFile; - _user = user; - _uncompress = uncompress; - _isUpdate = update; - } - - @Override - public LocalizedResource call() - throws AuthorizationException, KeyNotFoundException, IOException { - return _localizer.downloadBlob(_conf, _key, _localFile, _user, _uncompress, - _isUpdate); - } - } - - private LocalizedResource downloadBlob(Map<String, Object> conf, String key, File localFile, - String user, boolean uncompress, boolean isUpdate) - throws AuthorizationException, KeyNotFoundException, IOException { - ClientBlobStore blobstore = null; - try { - blobstore = getClientBlobStore(); - long nimbusBlobVersion = ServerUtils.nimbusVersionOfBlob(key, blobstore); - long oldVersion = ServerUtils.localVersionOfBlob(localFile.toString()); - FileOutputStream out = null; - PrintWriter writer = null; - int numTries = 0; - String localizedPath = localFile.toString(); - String localFileWithVersion = ServerUtils.constructBlobWithVersionFileName(localFile.toString(), - nimbusBlobVersion); - String localVersionFile = ServerUtils.constructVersionFileName(localFile.toString()); - String downloadFile = localFileWithVersion; - if (uncompress) { - // we need to download to temp file and then unpack into the one requested - downloadFile = new File(localFile.getParent(), TO_UNCOMPRESS + localFile.getName()).toString(); - } - while (numTries < _blobDownloadRetries) { - out = new FileOutputStream(downloadFile); - numTries++; - try { - if (!ServerUtils.canUserReadBlob(blobstore.getBlobMeta(key), user, conf)) { - throw new AuthorizationException(user + " does not have READ access to " + key); - } - InputStreamWithMeta in = blobstore.getBlob(key); - byte[] buffer = new byte[1024]; - int len; - while ((len = in.read(buffer)) >= 0) { - out.write(buffer, 0, len); - } - out.close(); - in.close(); - if (uncompress) { - ServerUtils.unpack(new File(downloadFile), new File(localFileWithVersion)); - LOG.debug("uncompressed " + downloadFile + " to: " + localFileWithVersion); - } - - // Next write the version. - LOG.info("Blob: " + key + " updated with new Nimbus-provided version: " + - nimbusBlobVersion + " local version was: " + oldVersion); - // The false parameter ensures overwriting the version file, not appending - writer = new PrintWriter( - new BufferedWriter(new FileWriter(localVersionFile, false))); - writer.println(nimbusBlobVersion); - writer.close(); - - try { - setBlobPermissions(conf, user, localFileWithVersion); - setBlobPermissions(conf, user, localVersionFile); - - // Update the key.current symlink. First create tmp symlink and do - // move of tmp to current so that the operation is atomic. - String tmp_uuid_local = java.util.UUID.randomUUID().toString(); - LOG.debug("Creating a symlink @" + localFile + "." + tmp_uuid_local + " , " + - "linking to: " + localFile + "." + nimbusBlobVersion); - File uuid_symlink = new File(localFile + "." + tmp_uuid_local); - - Files.createSymbolicLink(uuid_symlink.toPath(), - Paths.get(ServerUtils.constructBlobWithVersionFileName(localFile.toString(), - nimbusBlobVersion))); - File current_symlink = new File(ServerUtils.constructBlobCurrentSymlinkName( - localFile.toString())); - Files.move(uuid_symlink.toPath(), current_symlink.toPath(), ATOMIC_MOVE); - } catch (IOException e) { - // if we fail after writing the version file but before we move current link we need to - // restore the old version to the file - try { - PrintWriter restoreWriter = new PrintWriter( - new BufferedWriter(new FileWriter(localVersionFile, false))); - restoreWriter.println(oldVersion); - restoreWriter.close(); - } catch (IOException ignore) {} - throw e; - } - - String oldBlobFile = localFile + "." + oldVersion; - try { - // Remove the old version. Note that if a number of processes have that file open, - // the OS will keep the old blob file around until they all close the handle and only - // then deletes it. No new process will open the old blob, since the users will open the - // blob through the "blob.current" symlink, which always points to the latest version of - // a blob. Remove the old version after the current symlink is updated as to not affect - // anyone trying to read it. - if ((oldVersion != -1) && (oldVersion != nimbusBlobVersion)) { - LOG.info("Removing an old blob file:" + oldBlobFile); - Files.delete(Paths.get(oldBlobFile)); - } - } catch (IOException e) { - // At this point we have downloaded everything and moved symlinks. If the remove of - // old fails just log an error - LOG.error("Exception removing old blob version: " + oldBlobFile); - } - - break; - } catch (AuthorizationException ae) { - // we consider this non-retriable exceptions - if (out != null) { - out.close(); - } - new File(downloadFile).delete(); - throw ae; - } catch (IOException | KeyNotFoundException e) { - if (out != null) { - out.close(); - } - if (writer != null) { - writer.close(); - } - new File(downloadFile).delete(); - if (uncompress) { - try { - FileUtils.deleteDirectory(new File(localFileWithVersion)); - } catch (IOException ignore) {} - } - if (!isUpdate) { - // don't want to remove existing version file if its an update - new File(localVersionFile).delete(); - } - - if (numTries < _blobDownloadRetries) { - LOG.error("Failed to download blob, retrying", e); - } else { - throw e; - } - } - } - return new LocalizedResource(key, localizedPath, uncompress); - } finally { - if(blobstore != null) { - blobstore.shutdown(); - } - } - } - - public void setBlobPermissions(Map<String, Object> conf, String user, String path) - throws IOException { - - if (!ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { - return; - } - String wlCommand = ObjectReader.getString(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER), ""); - if (wlCommand.isEmpty()) { - String stormHome = System.getProperty("storm.home"); - wlCommand = stormHome + "/bin/worker-launcher"; - } - List<String> command = new ArrayList<String>(Arrays.asList(wlCommand, user, "blob", path)); - - String[] commandArray = command.toArray(new String[command.size()]); - ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray); - LOG.info("Setting blob permissions, command: {}", Arrays.toString(commandArray)); - - try { - shExec.execute(); - LOG.debug("output: {}", shExec.getOutput()); - } catch (ExitCodeException e) { - int exitCode = shExec.getExitCode(); - LOG.warn("Exit code from worker-launcher is : " + exitCode, e); - LOG.debug("output: {}", shExec.getOutput()); - throw new IOException("Setting blob permissions failed" + - " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e); - } - } - - - public synchronized void handleCacheCleanup() { - LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(_cacheTargetSize); - // need one large set of all and then clean via LRU - for (LocalizedResourceSet t : _userRsrc.values()) { - toClean.addResources(t); - LOG.debug("Resources to be cleaned after adding {} : {}", t.getUser(), toClean); - } - toClean.cleanup(); - LOG.debug("Resource cleanup: {}", toClean); - for (LocalizedResourceSet t : _userRsrc.values()) { - if (t.getSize() == 0) { - String user = t.getUser(); - - LOG.debug("removing empty set: {}", user); - File userFileCacheDir = getLocalUserFileCacheDir(user); - getCacheDirForFiles(userFileCacheDir).delete(); - getCacheDirForArchives(userFileCacheDir).delete(); - getLocalUserFileCacheDir(user).delete(); - boolean dirsRemoved = getLocalUserDir(user).delete(); - // to catch race with update thread - if (dirsRemoved) { - _userRsrc.remove(user); - } - } - } - } -}
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java index 311acda..7b127d2 100644 --- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java +++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java @@ -33,23 +33,26 @@ import org.apache.storm.blobstore.ClientBlobStore; import org.apache.storm.blobstore.InputStreamWithMeta; import org.apache.storm.blobstore.LocalFsBlobStore; import org.apache.storm.daemon.StormCommon; -import org.apache.storm.generated.*; -import org.apache.storm.localizer.Localizer; +import org.apache.storm.generated.AccessControl; +import org.apache.storm.generated.AccessControlType; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.KeyNotFoundException; +import org.apache.storm.generated.ReadableBlobMeta; +import org.apache.storm.generated.SettableBlobMeta; +import org.apache.storm.generated.StormTopology; import org.apache.storm.nimbus.NimbusInfo; import org.apache.storm.scheduler.resource.ResourceUtils; import org.apache.thrift.TException; -import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; -import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; -import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; @@ -61,7 +64,6 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; @@ -285,10 +287,6 @@ public class ServerUtils { return Files.getOwner(FileSystems.getDefault().getPath(path)).getName(); } - public static Localizer createLocalizer(Map<String, Object> conf, String baseDir) { - return new Localizer(conf, baseDir); - } - public static String containerFilePath (String dir) { return dir + FILE_PATH_SEPARATOR + "launch_container.sh"; } http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java index eb25566..60b628e 100644 --- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java +++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java @@ -25,7 +25,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.Future; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.storm.daemon.supervisor.Slot.StaticState; @@ -39,7 +39,7 @@ import org.apache.storm.generated.NodeInfo; import org.apache.storm.generated.ProfileAction; import org.apache.storm.generated.ProfileRequest; import org.apache.storm.generated.WorkerResources; -import org.apache.storm.localizer.ILocalizer; +import org.apache.storm.localizer.AsyncLocalizer; import org.apache.storm.scheduler.ISupervisor; import org.apache.storm.utils.LocalState; import org.apache.storm.utils.Time; @@ -115,7 +115,7 @@ public class SlotTest { @Test public void testEmptyToEmpty() throws Exception { try (SimulatedTime t = new SimulatedTime(1010)){ - ILocalizer localizer = mock(ILocalizer.class); + AsyncLocalizer localizer = mock(AsyncLocalizer.class); LocalState state = mock(LocalState.class); ContainerLauncher containerLauncher = mock(ContainerLauncher.class); ISupervisor iSuper = mock(ISupervisor.class); @@ -137,7 +137,7 @@ public class SlotTest { LocalAssignment newAssignment = mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0)); - ILocalizer localizer = mock(ILocalizer.class); + AsyncLocalizer localizer = mock(AsyncLocalizer.class); Container container = mock(Container.class); LocalState state = mock(LocalState.class); ContainerLauncher containerLauncher = mock(ContainerLauncher.class); @@ -146,11 +146,11 @@ public class SlotTest { when(container.readHeartbeat()).thenReturn(hb, hb); @SuppressWarnings("unchecked") - Future<Void> baseFuture = mock(Future.class); + CompletableFuture<Void> baseFuture = mock(CompletableFuture.class); when(localizer.requestDownloadBaseTopologyBlobs(newAssignment, port)).thenReturn(baseFuture); @SuppressWarnings("unchecked") - Future<Void> blobFuture = mock(Future.class); + CompletableFuture<Void> blobFuture = mock(CompletableFuture.class); when(localizer.requestDownloadTopologyBlobs(newAssignment, port)).thenReturn(blobFuture); ISupervisor iSuper = mock(ISupervisor.class); @@ -220,7 +220,7 @@ public class SlotTest { LocalAssignment assignment = mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0)); - ILocalizer localizer = mock(ILocalizer.class); + AsyncLocalizer localizer = mock(AsyncLocalizer.class); Container container = mock(Container.class); ContainerLauncher containerLauncher = mock(ContainerLauncher.class); LSWorkerHeartbeat oldhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs()-10); @@ -276,7 +276,7 @@ public class SlotTest { LocalAssignment nAssignment = mkLocalAssignment(nTopoId, nExecList, mkWorkerResources(100.0, 100.0, 100.0)); - ILocalizer localizer = mock(ILocalizer.class); + AsyncLocalizer localizer = mock(AsyncLocalizer.class); Container nContainer = mock(Container.class); LocalState state = mock(LocalState.class); ContainerLauncher containerLauncher = mock(ContainerLauncher.class); @@ -285,11 +285,11 @@ public class SlotTest { when(nContainer.readHeartbeat()).thenReturn(nhb, nhb); @SuppressWarnings("unchecked") - Future<Void> baseFuture = mock(Future.class); + CompletableFuture<Void> baseFuture = mock(CompletableFuture.class); when(localizer.requestDownloadBaseTopologyBlobs(nAssignment, port)).thenReturn(baseFuture); @SuppressWarnings("unchecked") - Future<Void> blobFuture = mock(Future.class); + CompletableFuture<Void> blobFuture = mock(CompletableFuture.class); when(localizer.requestDownloadTopologyBlobs(nAssignment, port)).thenReturn(blobFuture); ISupervisor iSuper = mock(ISupervisor.class); @@ -377,7 +377,7 @@ public class SlotTest { when(cContainer.readHeartbeat()).thenReturn(chb); when(cContainer.areAllProcessesDead()).thenReturn(false, true); - ILocalizer localizer = mock(ILocalizer.class); + AsyncLocalizer localizer = mock(AsyncLocalizer.class); ContainerLauncher containerLauncher = mock(ContainerLauncher.class); ISupervisor iSuper = mock(ISupervisor.class); @@ -437,7 +437,7 @@ public class SlotTest { when(cContainer.readHeartbeat()).thenReturn(chb, chb, chb, chb, chb, chb); when(cContainer.runProfiling(any(ProfileRequest.class), anyBoolean())).thenReturn(true); - ILocalizer localizer = mock(ILocalizer.class); + AsyncLocalizer localizer = mock(AsyncLocalizer.class); ContainerLauncher containerLauncher = mock(ContainerLauncher.class); ISupervisor iSuper = mock(ISupervisor.class); http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java index 2461b33..f49be63 100644 --- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java +++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java @@ -15,24 +15,51 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.localizer; +import static org.apache.storm.localizer.AsyncLocalizer.USERCACHE; +import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; +import com.google.common.base.Joiner; import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.storm.DaemonConfig; +import org.apache.storm.blobstore.BlobStoreAclHandler; +import org.apache.storm.blobstore.InputStreamWithMeta; +import org.apache.storm.blobstore.LocalFsBlobStore; import org.apache.storm.daemon.supervisor.AdvancedFSOps; +import org.apache.storm.generated.AccessControl; +import org.apache.storm.generated.AccessControlType; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.KeyNotFoundException; +import org.apache.storm.generated.ReadableBlobMeta; +import org.apache.storm.generated.SettableBlobMeta; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.ReflectionUtils; +import org.apache.storm.utils.Utils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.apache.storm.Config; @@ -41,9 +68,16 @@ import org.apache.storm.generated.ExecutorInfo; import org.apache.storm.generated.LocalAssignment; import org.apache.storm.generated.StormTopology; import org.apache.storm.security.auth.DefaultPrincipalToLocal; +import org.mockito.Mockito; public class AsyncLocalizerTest { + private static String getTestLocalizerRoot() { + File f = new File("./target/" + Thread.currentThread().getStackTrace()[2].getMethodName() + "/localizer/"); + f.deleteOnExit(); + return f.getPath(); + } + @Test public void testRequestDownloadBaseTopologyBlobs() throws Exception { final String topoId = "TOPO"; @@ -68,7 +102,6 @@ public class AsyncLocalizerTest { conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, DefaultPrincipalToLocal.class.getName()); conf.put(Config.STORM_CLUSTER_MODE, "distributed"); conf.put(Config.STORM_LOCAL_DIR, stormLocal); - Localizer localizer = mock(Localizer.class); AdvancedFSOps ops = mock(AdvancedFSOps.class); ConfigUtils mockedCU = mock(ConfigUtils.class); ReflectionUtils mockedRU = mock(ReflectionUtils.class); @@ -76,7 +109,7 @@ public class AsyncLocalizerTest { Map<String, Object> topoConf = new HashMap<>(conf); - AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops); + AsyncLocalizer bl = new AsyncLocalizer(conf, getTestLocalizerRoot(), ops, new AtomicReference<>(new HashMap<>()), null); ConfigUtils orig = ConfigUtils.setInstance(mockedCU); ReflectionUtils origRU = ReflectionUtils.setInstance(mockedRU); ServerUtils origUtils = ServerUtils.setInstance(mockedU); @@ -86,7 +119,7 @@ public class AsyncLocalizerTest { when(mockedRU.newInstanceImpl(ClientBlobStore.class)).thenReturn(blobStore); when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf); - Future<Void> f = al.requestDownloadBaseTopologyBlobs(la, port); + Future<Void> f = bl.requestDownloadBaseTopologyBlobs(la, port); f.get(20, TimeUnit.SECONDS); // We should be done now... @@ -102,7 +135,7 @@ public class AsyncLocalizerTest { verify(ops, never()).deleteIfExists(any(File.class)); } finally { - al.shutdown(); + bl.close(); ConfigUtils.setInstance(orig); ReflectionUtils.setInstance(origRU); ServerUtils.setInstance(origUtils); @@ -129,7 +162,7 @@ public class AsyncLocalizerTest { final File userDir = new File(stormLocal, user); final String stormRoot = stormLocal+topoId+"/"; - final String localizerRoot = "/tmp/storm-localizer/"; + final String localizerRoot = getTestLocalizerRoot(); final String simpleLocalFile = localizerRoot + user + "/simple"; final String simpleCurrentLocalFile = localizerRoot + user + "/simple.current"; @@ -146,7 +179,6 @@ public class AsyncLocalizerTest { Map<String, Object> conf = new HashMap<>(); conf.put(Config.STORM_LOCAL_DIR, stormLocal); - Localizer localizer = mock(Localizer.class); AdvancedFSOps ops = mock(AdvancedFSOps.class); ConfigUtils mockedCU = mock(ConfigUtils.class); @@ -157,33 +189,662 @@ public class AsyncLocalizerTest { List<LocalizedResource> localizedList = new ArrayList<>(); LocalizedResource simpleLocal = new LocalizedResource(simpleKey, simpleLocalFile, false); localizedList.add(simpleLocal); - - AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops); + + AsyncLocalizer bl = spy(new AsyncLocalizer(conf, localizerRoot, ops, new AtomicReference<>(new HashMap<>()), null)); ConfigUtils orig = ConfigUtils.setInstance(mockedCU); try { when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot); when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf); when(mockedCU.readSupervisorTopologyImpl(conf, topoId, ops)).thenReturn(st); - - when(localizer.getLocalUserFileCacheDir(user)).thenReturn(userDir); - - when(localizer.getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir))).thenReturn(localizedList); - - Future<Void> f = al.requestDownloadTopologyBlobs(la, port); + + //Write the mocking backwards so the actual method is not called on the spy object + doReturn(userDir).when(bl).getLocalUserFileCacheDir(user); + doReturn(localizedList).when(bl).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir)); + + Future<Void> f = bl.requestDownloadTopologyBlobs(la, port); f.get(20, TimeUnit.SECONDS); // We should be done now... - - verify(localizer).getLocalUserFileCacheDir(user); + + verify(bl).getLocalUserFileCacheDir(user); + verify(ops).fileExists(userDir); verify(ops).forceMkdir(userDir); - - verify(localizer).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir)); + + verify(bl).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir)); verify(ops).createSymlink(new File(stormRoot, simpleLocalName), new File(simpleCurrentLocalFile)); } finally { - al.shutdown(); + bl.close(); ConfigUtils.setInstance(orig); } } + + //From LocalizerTest + private File baseDir; + + private final String user1 = "user1"; + private final String user2 = "user2"; + private final String user3 = "user3"; + + private ClientBlobStore mockblobstore = mock(ClientBlobStore.class); + + + class TestLocalizer extends AsyncLocalizer { + + TestLocalizer(Map<String, Object> conf, String baseDir) throws IOException { + super(conf, baseDir, AdvancedFSOps.make(conf), new AtomicReference<>(new HashMap<>()), null); + } + + @Override + protected ClientBlobStore getClientBlobStore() { + return mockblobstore; + } + } + + class TestInputStreamWithMeta extends InputStreamWithMeta { + private InputStream iostream; + + public TestInputStreamWithMeta() { + iostream = IOUtils.toInputStream("some test data for my input stream"); + } + + public TestInputStreamWithMeta(InputStream istream) { + iostream = istream; + } + + @Override + public long getVersion() throws IOException { + return 1; + } + + @Override + public synchronized int read() { + return 0; + } + + @Override + public synchronized int read(byte[] b) + throws IOException { + int length = iostream.read(b); + if (length == 0) { + return -1; + } + return length; + } + + @Override + public long getFileLength() { + return 0; + } + }; + + @Before + public void setUp() throws Exception { + baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-"+ UUID.randomUUID()); + if (!baseDir.mkdir()) { + throw new IOException("failed to create base directory"); + } + } + + @After + public void tearDown() throws Exception { + try { + FileUtils.deleteDirectory(baseDir); + } catch (IOException ignore) {} + } + + protected String joinPath(String... pathList) { + return Joiner.on(File.separator).join(pathList); + } + + public String constructUserCacheDir(String base, String user) { + return joinPath(base, USERCACHE, user); + } + + public String constructExpectedFilesDir(String base, String user) { + return joinPath(constructUserCacheDir(base, user), AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR); + } + + public String constructExpectedArchivesDir(String base, String user) { + return joinPath(constructUserCacheDir(base, user), AsyncLocalizer.FILECACHE, AsyncLocalizer.ARCHIVESDIR); + } + + @Test + public void testDirPaths() throws Exception { + Map<String, Object> conf = new HashMap(); + AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString()); + + String expectedDir = constructUserCacheDir(baseDir.toString(), user1); + assertEquals("get local user dir doesn't return right value", + expectedDir, localizer.getLocalUserDir(user1).toString()); + + String expectedFileDir = joinPath(expectedDir, AsyncLocalizer.FILECACHE); + assertEquals("get local user file dir doesn't return right value", + expectedFileDir, localizer.getLocalUserFileCacheDir(user1).toString()); + } + + @Test + public void testReconstruct() throws Exception { + Map<String, Object> conf = new HashMap(); + + String expectedFileDir1 = constructExpectedFilesDir(baseDir.toString(), user1); + String expectedArchiveDir1 = constructExpectedArchivesDir(baseDir.toString(), user1); + String expectedFileDir2 = constructExpectedFilesDir(baseDir.toString(), user2); + String expectedArchiveDir2 = constructExpectedArchivesDir(baseDir.toString(), user2); + + String key1 = "testfile1.txt"; + String key2 = "testfile2.txt"; + String key3 = "testfile3.txt"; + String key4 = "testfile4.txt"; + + String archive1 = "archive1"; + String archive2 = "archive2"; + + File user1file1 = new File(expectedFileDir1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); + File user1file2 = new File(expectedFileDir1, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); + File user2file3 = new File(expectedFileDir2, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); + File user2file4 = new File(expectedFileDir2, key4 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); + + File user1archive1 = new File(expectedArchiveDir1, archive1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); + File user2archive2 = new File(expectedArchiveDir2, archive2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); + File user1archive1file = new File(user1archive1, "file1"); + File user2archive2file = new File(user2archive2, "file2"); + + // setup some files/dirs to emulate supervisor restart + assertTrue("Failed setup filecache dir1", new File(expectedFileDir1).mkdirs()); + assertTrue("Failed setup filecache dir2", new File(expectedFileDir2).mkdirs()); + assertTrue("Failed setup file1", user1file1.createNewFile()); + assertTrue("Failed setup file2", user1file2.createNewFile()); + assertTrue("Failed setup file3", user2file3.createNewFile()); + assertTrue("Failed setup file4", user2file4.createNewFile()); + assertTrue("Failed setup archive dir1", user1archive1.mkdirs()); + assertTrue("Failed setup archive dir2", user2archive2.mkdirs()); + assertTrue("Failed setup file in archivedir1", user1archive1file.createNewFile()); + assertTrue("Failed setup file in archivedir2", user2archive2file.createNewFile()); + + AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString()); + + ArrayList<LocalResource> arrUser1Keys = new ArrayList<LocalResource>(); + arrUser1Keys.add(new LocalResource(key1, false)); + arrUser1Keys.add(new LocalResource(archive1, true)); + localizer.addReferences(arrUser1Keys, user1, "topo1"); + + LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1); + assertEquals("local resource set size wrong", 3, lrsrcSet.getSize()); + assertEquals("user doesn't match", user1, lrsrcSet.getUser()); + LocalizedResource key1rsrc = lrsrcSet.get(key1, false); + assertNotNull("Local resource doesn't exist but should", key1rsrc); + assertEquals("key doesn't match", key1, key1rsrc.getKey()); + assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount()); + LocalizedResource key2rsrc = lrsrcSet.get(key2, false); + assertNotNull("Local resource doesn't exist but should", key2rsrc); + assertEquals("key doesn't match", key2, key2rsrc.getKey()); + assertEquals("refcount doesn't match", 0, key2rsrc.getRefCount()); + LocalizedResource archive1rsrc = lrsrcSet.get(archive1, true); + assertNotNull("Local resource doesn't exist but should", archive1rsrc); + assertEquals("key doesn't match", archive1, archive1rsrc.getKey()); + assertEquals("refcount doesn't match", 1, archive1rsrc.getRefCount()); + + LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2); + assertEquals("local resource set size wrong", 3, lrsrcSet2.getSize()); + assertEquals("user doesn't match", user2, lrsrcSet2.getUser()); + LocalizedResource key3rsrc = lrsrcSet2.get(key3, false); + assertNotNull("Local resource doesn't exist but should", key3rsrc); + assertEquals("key doesn't match", key3, key3rsrc.getKey()); + assertEquals("refcount doesn't match", 0, key3rsrc.getRefCount()); + LocalizedResource key4rsrc = lrsrcSet2.get(key4, false); + assertNotNull("Local resource doesn't exist but should", key4rsrc); + assertEquals("key doesn't match", key4, key4rsrc.getKey()); + assertEquals("refcount doesn't match", 0, key4rsrc.getRefCount()); + LocalizedResource archive2rsrc = lrsrcSet2.get(archive2, true); + assertNotNull("Local resource doesn't exist but should", archive2rsrc); + assertEquals("key doesn't match", archive2, archive2rsrc.getKey()); + assertEquals("refcount doesn't match", 0, archive2rsrc.getRefCount()); + } + + @Test + public void testArchivesTgz() throws Exception { + testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tgz")), true, 21344); + } + + @Test + public void testArchivesZip() throws Exception { + testArchives(getFileFromResource(joinPath("localizer", "localtest.zip")), false, 21348); + } + + @Test + public void testArchivesTarGz() throws Exception { + testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar.gz")), true, 21344); + } + + @Test + public void testArchivesTar() throws Exception { + testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar")), true, 21344); + } + + @Test + public void testArchivesJar() throws Exception { + testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.jar")), false, 21416); + } + + private File getFileFromResource(String archivePath) { + ClassLoader classLoader = getClass().getClassLoader(); + return new File(classLoader.getResource(archivePath).getFile()); + } + + // archive passed in must contain symlink named tmptestsymlink if not a zip file + public void testArchives(File archiveFile, boolean supportSymlinks, int size) throws Exception { + if (Utils.isOnWindows()) { + // Windows should set this to false cause symlink in compressed file doesn't work properly. + supportSymlinks = false; + } + + Map<String, Object> conf = new HashMap(); + // set clean time really high so doesn't kick in + conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000); + + String key1 = archiveFile.getName(); + String topo1 = "topo1"; + AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString()); + // set really small so will do cleanup + localizer.setTargetCacheSize(1); + + ReadableBlobMeta rbm = new ReadableBlobMeta(); + rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING)); + when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm); + + when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(new + FileInputStream(archiveFile.getAbsolutePath()))); + + long timeBefore = System.nanoTime(); + File user1Dir = localizer.getLocalUserFileCacheDir(user1); + assertTrue("failed to create user dir", user1Dir.mkdirs()); + LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, true), user1, topo1, + user1Dir); + long timeAfter = System.nanoTime(); + + String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1); + String expectedFileDir = joinPath(expectedUserDir, AsyncLocalizer.FILECACHE, AsyncLocalizer.ARCHIVESDIR); + assertTrue("user filecache dir not created", new File(expectedFileDir).exists()); + File keyFile = new File(expectedFileDir, key1 + ".0"); + assertTrue("blob not created", keyFile.exists()); + assertTrue("blob is not uncompressed", keyFile.isDirectory()); + File symlinkFile = new File(keyFile, "tmptestsymlink"); + + if (supportSymlinks) { + assertTrue("blob uncompressed doesn't contain symlink", Files.isSymbolicLink( + symlinkFile.toPath())); + } else { + assertTrue("blob symlink file doesn't exist", symlinkFile.exists()); + } + + LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1); + assertEquals("local resource set size wrong", 1, lrsrcSet.getSize()); + assertEquals("user doesn't match", user1, lrsrcSet.getUser()); + LocalizedResource key1rsrc = lrsrcSet.get(key1, true); + assertNotNull("Local resource doesn't exist but should", key1rsrc); + assertEquals("key doesn't match", key1, key1rsrc.getKey()); + assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount()); + assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePathWithVersion()); + assertEquals("size doesn't match", size, key1rsrc.getSize()); + assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc + .getLastAccessTime() <= timeAfter)); + + timeBefore = System.nanoTime(); + localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, true); + timeAfter = System.nanoTime(); + + lrsrcSet = localizer.getUserResources().get(user1); + assertEquals("local resource set size wrong", 1, lrsrcSet.getSize()); + key1rsrc = lrsrcSet.get(key1, true); + assertNotNull("Local resource doesn't exist but should", key1rsrc); + assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount()); + assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc + .getLastAccessTime() <= timeAfter)); + + // should remove the blob since cache size set really small + localizer.cleanup(); + + lrsrcSet = localizer.getUserResources().get(user1); + assertFalse("blob contents not deleted", symlinkFile.exists()); + assertFalse("blob not deleted", keyFile.exists()); + assertFalse("blob file dir not deleted", new File(expectedFileDir).exists()); + assertFalse("blob dir not deleted", new File(expectedUserDir).exists()); + assertNull("user set should be null", lrsrcSet); + + } + + @Test + public void testBasic() throws Exception { + Map<String, Object> conf = new HashMap(); + // set clean time really high so doesn't kick in + conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000); + + String key1 = "key1"; + String topo1 = "topo1"; + AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString()); + // set really small so will do cleanup + localizer.setTargetCacheSize(1); + + ReadableBlobMeta rbm = new ReadableBlobMeta(); + rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING)); + when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm); + + when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta()); + + long timeBefore = System.nanoTime(); + File user1Dir = localizer.getLocalUserFileCacheDir(user1); + assertTrue("failed to create user dir", user1Dir.mkdirs()); + LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1, + user1Dir); + long timeAfter = System.nanoTime(); + + String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1); + String expectedFileDir = joinPath(expectedUserDir, AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR); + assertTrue("user filecache dir not created", new File(expectedFileDir).exists()); + File keyFile = new File(expectedFileDir, key1); + File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); + + assertTrue("blob not created", keyFileCurrentSymlink.exists()); + + LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1); + assertEquals("local resource set size wrong", 1, lrsrcSet.getSize()); + assertEquals("user doesn't match", user1, lrsrcSet.getUser()); + LocalizedResource key1rsrc = lrsrcSet.get(key1, false); + assertNotNull("Local resource doesn't exist but should", key1rsrc); + assertEquals("key doesn't match", key1, key1rsrc.getKey()); + assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount()); + assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePath()); + assertEquals("size doesn't match", 34, key1rsrc.getSize()); + assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc + .getLastAccessTime() <= timeAfter)); + + timeBefore = System.nanoTime(); + localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false); + timeAfter = System.nanoTime(); + + lrsrcSet = localizer.getUserResources().get(user1); + assertEquals("local resource set size wrong", 1, lrsrcSet.getSize()); + key1rsrc = lrsrcSet.get(key1, false); + assertNotNull("Local resource doesn't exist but should", key1rsrc); + assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount()); + assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc + .getLastAccessTime() <= timeAfter)); + + // should remove the blob since cache size set really small + localizer.cleanup(); + + lrsrcSet = localizer.getUserResources().get(user1); + assertNull("user set should be null", lrsrcSet); + assertFalse("blob not deleted", keyFile.exists()); + assertFalse("blob dir not deleted", new File(expectedFileDir).exists()); + assertFalse("blob dir not deleted", new File(expectedUserDir).exists()); + } + + @Test + public void testMultipleKeysOneUser() throws Exception { + Map<String, Object> conf = new HashMap(); + // set clean time really high so doesn't kick in + conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000); + + String key1 = "key1"; + String topo1 = "topo1"; + String key2 = "key2"; + String key3 = "key3"; + AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString()); + // set to keep 2 blobs (each of size 34) + localizer.setTargetCacheSize(68); + + ReadableBlobMeta rbm = new ReadableBlobMeta(); + rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING)); + when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm); + when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta()); + when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta()); + when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta()); + + List<LocalResource> keys = Arrays.asList(new LocalResource[]{new LocalResource(key1, false), + new LocalResource(key2, false), new LocalResource(key3, false)}); + File user1Dir = localizer.getLocalUserFileCacheDir(user1); + assertTrue("failed to create user dir", user1Dir.mkdirs()); + + List<LocalizedResource> lrsrcs = localizer.getBlobs(keys, user1, topo1, user1Dir); + LocalizedResource lrsrc = lrsrcs.get(0); + LocalizedResource lrsrc2 = lrsrcs.get(1); + LocalizedResource lrsrc3 = lrsrcs.get(2); + + String expectedFileDir = joinPath(baseDir.toString(), USERCACHE, user1, + AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR); + assertTrue("user filecache dir not created", new File(expectedFileDir).exists()); + File keyFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); + File keyFile2 = new File(expectedFileDir, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); + File keyFile3 = new File(expectedFileDir, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); + + assertTrue("blob not created", keyFile.exists()); + assertTrue("blob not created", keyFile2.exists()); + assertTrue("blob not created", keyFile3.exists()); + assertEquals("size doesn't match", 34, keyFile.length()); + assertEquals("size doesn't match", 34, keyFile2.length()); + assertEquals("size doesn't match", 34, keyFile3.length()); + assertEquals("size doesn't match", 34, lrsrc.getSize()); + assertEquals("size doesn't match", 34, lrsrc3.getSize()); + assertEquals("size doesn't match", 34, lrsrc2.getSize()); + + LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1); + assertEquals("local resource set size wrong", 3, lrsrcSet.getSize()); + assertEquals("user doesn't match", user1, lrsrcSet.getUser()); + + long timeBefore = System.nanoTime(); + localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false); + localizer.removeBlobReference(lrsrc2.getKey(), user1, topo1, false); + localizer.removeBlobReference(lrsrc3.getKey(), user1, topo1, false); + long timeAfter = System.nanoTime(); + + // add reference to one and then remove reference again so it has newer timestamp + lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir); + assertTrue("timestamp not within range", (lrsrc.getLastAccessTime() >= timeBefore && lrsrc + .getLastAccessTime() <= timeAfter)); + localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false); + + // should remove the second blob first + localizer.cleanup(); + + lrsrcSet = localizer.getUserResources().get(user1); + assertEquals("local resource set size wrong", 2, lrsrcSet.getSize()); + long end = System.currentTimeMillis() + 100; + while ((end - System.currentTimeMillis()) >= 0 && keyFile2.exists()) { + Thread.sleep(1); + } + assertFalse("blob not deleted", keyFile2.exists()); + assertTrue("blob deleted", keyFile.exists()); + assertTrue("blob deleted", keyFile3.exists()); + + // set size to cleanup another one + localizer.setTargetCacheSize(34); + + // should remove the third blob + localizer.cleanup(); + + lrsrcSet = localizer.getUserResources().get(user1); + assertEquals("local resource set size wrong", 1, lrsrcSet.getSize()); + assertTrue("blob deleted", keyFile.exists()); + assertFalse("blob not deleted", keyFile3.exists()); + } + + @Test(expected = AuthorizationException.class) + public void testFailAcls() throws Exception { + Map<String, Object> conf = new HashMap(); + // set clean time really high so doesn't kick in + conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000); + // enable blobstore acl validation + conf.put(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED, true); + + String topo1 = "topo1"; + String key1 = "key1"; + AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString()); + + ReadableBlobMeta rbm = new ReadableBlobMeta(); + // set acl so user doesn't have read access + AccessControl acl = new AccessControl(AccessControlType.USER, BlobStoreAclHandler.ADMIN); + acl.set_name(user1); + rbm.set_settable(new SettableBlobMeta(Arrays.asList(acl))); + when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm); + when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta()); + File user1Dir = localizer.getLocalUserFileCacheDir(user1); + assertTrue("failed to create user dir", user1Dir.mkdirs()); + + // This should throw AuthorizationException because auth failed + localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir); + } + + @Test(expected = KeyNotFoundException.class) + public void testKeyNotFoundException() throws Exception { + Map<String, Object> conf = Utils.readStormConfig(); + String key1 = "key1"; + conf.put(Config.STORM_LOCAL_DIR, "target"); + LocalFsBlobStore bs = new LocalFsBlobStore(); + LocalFsBlobStore spy = spy(bs); + Mockito.doReturn(true).when(spy).checkForBlobOrDownload(key1); + Mockito.doNothing().when(spy).checkForBlobUpdate(key1); + spy.prepare(conf,null,null); + spy.getBlob(key1, null); + } + + @Test + public void testMultipleUsers() throws Exception { + Map<String, Object> conf = new HashMap(); + // set clean time really high so doesn't kick in + conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000); + + String topo1 = "topo1"; + String topo2 = "topo2"; + String topo3 = "topo3"; + String key1 = "key1"; + String key2 = "key2"; + String key3 = "key3"; + AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString()); + // set to keep 2 blobs (each of size 34) + localizer.setTargetCacheSize(68); + + ReadableBlobMeta rbm = new ReadableBlobMeta(); + rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING)); + when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm); + when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta()); + when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta()); + when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta()); + + File user1Dir = localizer.getLocalUserFileCacheDir(user1); + assertTrue("failed to create user dir", user1Dir.mkdirs()); + File user2Dir = localizer.getLocalUserFileCacheDir(user2); + assertTrue("failed to create user dir", user2Dir.mkdirs()); + File user3Dir = localizer.getLocalUserFileCacheDir(user3); + assertTrue("failed to create user dir", user3Dir.mkdirs()); + + LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1, + user1Dir); + LocalizedResource lrsrc2 = localizer.getBlob(new LocalResource(key2, false), user2, topo2, + user2Dir); + LocalizedResource lrsrc3 = localizer.getBlob(new LocalResource(key3, false), user3, topo3, + user3Dir); + // make sure we support different user reading same blob + LocalizedResource lrsrc1_user3 = localizer.getBlob(new LocalResource(key1, false), user3, + topo3, user3Dir); + + String expectedUserDir1 = joinPath(baseDir.toString(), USERCACHE, user1); + String expectedFileDirUser1 = joinPath(expectedUserDir1, AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR); + String expectedFileDirUser2 = joinPath(baseDir.toString(), USERCACHE, user2, + AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR); + String expectedFileDirUser3 = joinPath(baseDir.toString(), USERCACHE, user3, + AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR); + assertTrue("user filecache dir user1 not created", new File(expectedFileDirUser1).exists()); + assertTrue("user filecache dir user2 not created", new File(expectedFileDirUser2).exists()); + assertTrue("user filecache dir user3 not created", new File(expectedFileDirUser3).exists()); + + File keyFile = new File(expectedFileDirUser1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); + File keyFile2 = new File(expectedFileDirUser2, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); + File keyFile3 = new File(expectedFileDirUser3, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); + File keyFile1user3 = new File(expectedFileDirUser3, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); + + assertTrue("blob not created", keyFile.exists()); + assertTrue("blob not created", keyFile2.exists()); + assertTrue("blob not created", keyFile3.exists()); + assertTrue("blob not created", keyFile1user3.exists()); + + LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1); + assertEquals("local resource set size wrong", 1, lrsrcSet.getSize()); + LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2); + assertEquals("local resource set size wrong", 1, lrsrcSet2.getSize()); + LocalizedResourceSet lrsrcSet3 = localizer.getUserResources().get(user3); + assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize()); + + localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false); + // should remove key1 + localizer.cleanup(); + + lrsrcSet = localizer.getUserResources().get(user1); + lrsrcSet3 = localizer.getUserResources().get(user3); + assertNull("user set should be null", lrsrcSet); + assertFalse("blob dir not deleted", new File(expectedFileDirUser1).exists()); + assertFalse("blob dir not deleted", new File(expectedUserDir1).exists()); + assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize()); + + assertTrue("blob deleted", keyFile2.exists()); + assertFalse("blob not deleted", keyFile.exists()); + assertTrue("blob deleted", keyFile3.exists()); + assertTrue("blob deleted", keyFile1user3.exists()); + } + + @Test + public void testUpdate() throws Exception { + Map<String, Object> conf = new HashMap(); + // set clean time really high so doesn't kick in + conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000); + + String key1 = "key1"; + String topo1 = "topo1"; + String topo2 = "topo2"; + AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString()); + + ReadableBlobMeta rbm = new ReadableBlobMeta(); + rbm.set_version(1); + rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING)); + when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm); + when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta()); + + File user1Dir = localizer.getLocalUserFileCacheDir(user1); + assertTrue("failed to create user dir", user1Dir.mkdirs()); + LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1, + user1Dir); + + String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1); + String expectedFileDir = joinPath(expectedUserDir, AsyncLocalizer.FILECACHE, AsyncLocalizer.FILESDIR); + assertTrue("user filecache dir not created", new File(expectedFileDir).exists()); + File keyFile = new File(expectedFileDir, key1); + File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); + assertTrue("blob not created", keyFileCurrentSymlink.exists()); + File versionFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_BLOB_VERSION_SUFFIX); + assertTrue("blob version file not created", versionFile.exists()); + assertEquals("blob version not correct", 1, ServerUtils.localVersionOfBlob(keyFile.toString())); + + LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1); + assertEquals("local resource set size wrong", 1, lrsrcSet.getSize()); + + // test another topology getting blob with updated version - it should update version now + rbm.set_version(2); + + localizer.getBlob(new LocalResource(key1, false), user1, topo2, user1Dir); + assertTrue("blob version file not created", versionFile.exists()); + assertEquals("blob version not correct", 2, ServerUtils.localVersionOfBlob(keyFile.toString())); + assertTrue("blob file with version 2 not created", new File(keyFile + ".2").exists()); + + // now test regular updateBlob + rbm.set_version(3); + + ArrayList<LocalResource> arr = new ArrayList<LocalResource>(); + arr.add(new LocalResource(key1, false)); + localizer.updateBlobs(arr, user1); + assertTrue("blob version file not created", versionFile.exists()); + assertEquals("blob version not correct", 3, ServerUtils.localVersionOfBlob(keyFile.toString())); + assertTrue("blob file with version 3 not created", new File(keyFile + ".3").exists()); + } }