Repository: storm Updated Branches: refs/heads/master b5be1d6b0 -> 5c05bb74c
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java deleted file mode 100644 index 70e8e9d..0000000 --- a/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java +++ /dev/null @@ -1,682 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.localizer; - -import org.apache.storm.Config; -import org.apache.storm.DaemonConfig; -import org.apache.storm.blobstore.BlobStoreAclHandler; -import org.apache.storm.blobstore.ClientBlobStore; -import org.apache.storm.blobstore.InputStreamWithMeta; -import org.apache.storm.blobstore.LocalFsBlobStore; -import org.apache.storm.generated.AccessControl; -import org.apache.storm.generated.AccessControlType; -import org.apache.storm.generated.AuthorizationException; -import org.apache.storm.generated.KeyNotFoundException; -import org.apache.storm.generated.ReadableBlobMeta; -import org.apache.storm.generated.SettableBlobMeta; -import org.apache.storm.utils.ServerUtils; -import org.apache.storm.utils.Utils; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import com.google.common.base.Joiner; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.util.*; - -import static org.apache.storm.localizer.Localizer.USERCACHE; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - - -public class LocalizerTest { - - private File baseDir; - - private final String user1 = "user1"; - private final String user2 = "user2"; - private final String user3 = "user3"; - - private ClientBlobStore mockblobstore = mock(ClientBlobStore.class); - - - class TestLocalizer extends Localizer { - - TestLocalizer(Map<String, Object> conf, String baseDir) { - super(conf, baseDir); - } - - @Override - protected ClientBlobStore getClientBlobStore() { - return mockblobstore; - } - } - - class TestInputStreamWithMeta extends InputStreamWithMeta { - private InputStream iostream; - - public TestInputStreamWithMeta() { - iostream = IOUtils.toInputStream("some test data for my input stream"); - } - - public TestInputStreamWithMeta(InputStream istream) { - iostream = istream; - } - - @Override - public long getVersion() throws IOException { - return 1; - } - - @Override - public synchronized int read() { - return 0; - } - - @Override - public synchronized int read(byte[] b) - throws IOException { - int length = iostream.read(b); - if (length == 0) { - return -1; - } - return length; - } - - @Override - public long getFileLength() { - return 0; - } - }; - - @Before - public void setUp() throws Exception { - baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-"+ UUID.randomUUID()); - if (!baseDir.mkdir()) { - throw new IOException("failed to create base directory"); - } - } - - @After - public void tearDown() throws Exception { - try { - FileUtils.deleteDirectory(baseDir); - } catch (IOException ignore) {} - } - - protected String joinPath(String... pathList) { - return Joiner.on(File.separator).join(pathList); - } - - public String constructUserCacheDir(String base, String user) { - return joinPath(base, USERCACHE, user); - } - - public String constructExpectedFilesDir(String base, String user) { - return joinPath(constructUserCacheDir(base, user), Localizer.FILECACHE, Localizer.FILESDIR); - } - - public String constructExpectedArchivesDir(String base, String user) { - return joinPath(constructUserCacheDir(base, user), Localizer.FILECACHE, Localizer.ARCHIVESDIR); - } - - @Test - public void testDirPaths() throws Exception { - Map<String, Object> conf = new HashMap(); - Localizer localizer = new TestLocalizer(conf, baseDir.toString()); - - String expectedDir = constructUserCacheDir(baseDir.toString(), user1); - assertEquals("get local user dir doesn't return right value", - expectedDir, localizer.getLocalUserDir(user1).toString()); - - String expectedFileDir = joinPath(expectedDir, Localizer.FILECACHE); - assertEquals("get local user file dir doesn't return right value", - expectedFileDir, localizer.getLocalUserFileCacheDir(user1).toString()); - } - - @Test - public void testReconstruct() throws Exception { - Map<String, Object> conf = new HashMap(); - - String expectedFileDir1 = constructExpectedFilesDir(baseDir.toString(), user1); - String expectedArchiveDir1 = constructExpectedArchivesDir(baseDir.toString(), user1); - String expectedFileDir2 = constructExpectedFilesDir(baseDir.toString(), user2); - String expectedArchiveDir2 = constructExpectedArchivesDir(baseDir.toString(), user2); - - String key1 = "testfile1.txt"; - String key2 = "testfile2.txt"; - String key3 = "testfile3.txt"; - String key4 = "testfile4.txt"; - - String archive1 = "archive1"; - String archive2 = "archive2"; - - File user1file1 = new File(expectedFileDir1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - File user1file2 = new File(expectedFileDir1, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - File user2file3 = new File(expectedFileDir2, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - File user2file4 = new File(expectedFileDir2, key4 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - - File user1archive1 = new File(expectedArchiveDir1, archive1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - File user2archive2 = new File(expectedArchiveDir2, archive2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - File user1archive1file = new File(user1archive1, "file1"); - File user2archive2file = new File(user2archive2, "file2"); - - // setup some files/dirs to emulate supervisor restart - assertTrue("Failed setup filecache dir1", new File(expectedFileDir1).mkdirs()); - assertTrue("Failed setup filecache dir2", new File(expectedFileDir2).mkdirs()); - assertTrue("Failed setup file1", user1file1.createNewFile()); - assertTrue("Failed setup file2", user1file2.createNewFile()); - assertTrue("Failed setup file3", user2file3.createNewFile()); - assertTrue("Failed setup file4", user2file4.createNewFile()); - assertTrue("Failed setup archive dir1", user1archive1.mkdirs()); - assertTrue("Failed setup archive dir2", user2archive2.mkdirs()); - assertTrue("Failed setup file in archivedir1", user1archive1file.createNewFile()); - assertTrue("Failed setup file in archivedir2", user2archive2file.createNewFile()); - - Localizer localizer = new TestLocalizer(conf, baseDir.toString()); - - ArrayList<LocalResource> arrUser1Keys = new ArrayList<LocalResource>(); - arrUser1Keys.add(new LocalResource(key1, false)); - arrUser1Keys.add(new LocalResource(archive1, true)); - localizer.addReferences(arrUser1Keys, user1, "topo1"); - - LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1); - assertEquals("local resource set size wrong", 3, lrsrcSet.getSize()); - assertEquals("user doesn't match", user1, lrsrcSet.getUser()); - LocalizedResource key1rsrc = lrsrcSet.get(key1, false); - assertNotNull("Local resource doesn't exist but should", key1rsrc); - assertEquals("key doesn't match", key1, key1rsrc.getKey()); - assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount()); - LocalizedResource key2rsrc = lrsrcSet.get(key2, false); - assertNotNull("Local resource doesn't exist but should", key2rsrc); - assertEquals("key doesn't match", key2, key2rsrc.getKey()); - assertEquals("refcount doesn't match", 0, key2rsrc.getRefCount()); - LocalizedResource archive1rsrc = lrsrcSet.get(archive1, true); - assertNotNull("Local resource doesn't exist but should", archive1rsrc); - assertEquals("key doesn't match", archive1, archive1rsrc.getKey()); - assertEquals("refcount doesn't match", 1, archive1rsrc.getRefCount()); - - LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2); - assertEquals("local resource set size wrong", 3, lrsrcSet2.getSize()); - assertEquals("user doesn't match", user2, lrsrcSet2.getUser()); - LocalizedResource key3rsrc = lrsrcSet2.get(key3, false); - assertNotNull("Local resource doesn't exist but should", key3rsrc); - assertEquals("key doesn't match", key3, key3rsrc.getKey()); - assertEquals("refcount doesn't match", 0, key3rsrc.getRefCount()); - LocalizedResource key4rsrc = lrsrcSet2.get(key4, false); - assertNotNull("Local resource doesn't exist but should", key4rsrc); - assertEquals("key doesn't match", key4, key4rsrc.getKey()); - assertEquals("refcount doesn't match", 0, key4rsrc.getRefCount()); - LocalizedResource archive2rsrc = lrsrcSet2.get(archive2, true); - assertNotNull("Local resource doesn't exist but should", archive2rsrc); - assertEquals("key doesn't match", archive2, archive2rsrc.getKey()); - assertEquals("refcount doesn't match", 0, archive2rsrc.getRefCount()); - } - - @Test - public void testArchivesTgz() throws Exception { - testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tgz")), true, 21344); - } - - @Test - public void testArchivesZip() throws Exception { - testArchives(getFileFromResource(joinPath("localizer", "localtest.zip")), false, 21348); - } - - @Test - public void testArchivesTarGz() throws Exception { - testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar.gz")), true, 21344); - } - - @Test - public void testArchivesTar() throws Exception { - testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.tar")), true, 21344); - } - - @Test - public void testArchivesJar() throws Exception { - testArchives(getFileFromResource(joinPath("localizer", "localtestwithsymlink.jar")), false, 21416); - } - - private File getFileFromResource(String archivePath) { - ClassLoader classLoader = getClass().getClassLoader(); - return new File(classLoader.getResource(archivePath).getFile()); - } - - // archive passed in must contain symlink named tmptestsymlink if not a zip file - public void testArchives(File archiveFile, boolean supportSymlinks, int size) throws Exception { - if (Utils.isOnWindows()) { - // Windows should set this to false cause symlink in compressed file doesn't work properly. - supportSymlinks = false; - } - - Map<String, Object> conf = new HashMap(); - // set clean time really high so doesn't kick in - conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000); - - String key1 = archiveFile.getName(); - String topo1 = "topo1"; - Localizer localizer = new TestLocalizer(conf, baseDir.toString()); - // set really small so will do cleanup - localizer.setTargetCacheSize(1); - - ReadableBlobMeta rbm = new ReadableBlobMeta(); - rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING)); - when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm); - - when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(new - FileInputStream(archiveFile.getAbsolutePath()))); - - long timeBefore = System.nanoTime(); - File user1Dir = localizer.getLocalUserFileCacheDir(user1); - assertTrue("failed to create user dir", user1Dir.mkdirs()); - LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, true), user1, topo1, - user1Dir); - long timeAfter = System.nanoTime(); - - String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1); - String expectedFileDir = joinPath(expectedUserDir, Localizer.FILECACHE, Localizer.ARCHIVESDIR); - assertTrue("user filecache dir not created", new File(expectedFileDir).exists()); - File keyFile = new File(expectedFileDir, key1 + ".0"); - assertTrue("blob not created", keyFile.exists()); - assertTrue("blob is not uncompressed", keyFile.isDirectory()); - File symlinkFile = new File(keyFile, "tmptestsymlink"); - - if (supportSymlinks) { - assertTrue("blob uncompressed doesn't contain symlink", Files.isSymbolicLink( - symlinkFile.toPath())); - } else { - assertTrue("blob symlink file doesn't exist", symlinkFile.exists()); - } - - LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1); - assertEquals("local resource set size wrong", 1, lrsrcSet.getSize()); - assertEquals("user doesn't match", user1, lrsrcSet.getUser()); - LocalizedResource key1rsrc = lrsrcSet.get(key1, true); - assertNotNull("Local resource doesn't exist but should", key1rsrc); - assertEquals("key doesn't match", key1, key1rsrc.getKey()); - assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount()); - assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePathWithVersion()); - assertEquals("size doesn't match", size, key1rsrc.getSize()); - assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc - .getLastAccessTime() <= timeAfter)); - - timeBefore = System.nanoTime(); - localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, true); - timeAfter = System.nanoTime(); - - lrsrcSet = localizer.getUserResources().get(user1); - assertEquals("local resource set size wrong", 1, lrsrcSet.getSize()); - key1rsrc = lrsrcSet.get(key1, true); - assertNotNull("Local resource doesn't exist but should", key1rsrc); - assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount()); - assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc - .getLastAccessTime() <= timeAfter)); - - // should remove the blob since cache size set really small - localizer.handleCacheCleanup(); - - lrsrcSet = localizer.getUserResources().get(user1); - assertFalse("blob contents not deleted", symlinkFile.exists()); - assertFalse("blob not deleted", keyFile.exists()); - assertFalse("blob file dir not deleted", new File(expectedFileDir).exists()); - assertFalse("blob dir not deleted", new File(expectedUserDir).exists()); - assertNull("user set should be null", lrsrcSet); - - } - - @Test - public void testBasic() throws Exception { - Map<String, Object> conf = new HashMap(); - // set clean time really high so doesn't kick in - conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000); - - String key1 = "key1"; - String topo1 = "topo1"; - Localizer localizer = new TestLocalizer(conf, baseDir.toString()); - // set really small so will do cleanup - localizer.setTargetCacheSize(1); - - ReadableBlobMeta rbm = new ReadableBlobMeta(); - rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING)); - when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm); - - when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta()); - - long timeBefore = System.nanoTime(); - File user1Dir = localizer.getLocalUserFileCacheDir(user1); - assertTrue("failed to create user dir", user1Dir.mkdirs()); - LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1, - user1Dir); - long timeAfter = System.nanoTime(); - - String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1); - String expectedFileDir = joinPath(expectedUserDir, Localizer.FILECACHE, Localizer.FILESDIR); - assertTrue("user filecache dir not created", new File(expectedFileDir).exists()); - File keyFile = new File(expectedFileDir, key1); - File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - - assertTrue("blob not created", keyFileCurrentSymlink.exists()); - - LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1); - assertEquals("local resource set size wrong", 1, lrsrcSet.getSize()); - assertEquals("user doesn't match", user1, lrsrcSet.getUser()); - LocalizedResource key1rsrc = lrsrcSet.get(key1, false); - assertNotNull("Local resource doesn't exist but should", key1rsrc); - assertEquals("key doesn't match", key1, key1rsrc.getKey()); - assertEquals("refcount doesn't match", 1, key1rsrc.getRefCount()); - assertEquals("file path doesn't match", keyFile.toString(), key1rsrc.getFilePath()); - assertEquals("size doesn't match", 34, key1rsrc.getSize()); - assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc - .getLastAccessTime() <= timeAfter)); - - timeBefore = System.nanoTime(); - localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false); - timeAfter = System.nanoTime(); - - lrsrcSet = localizer.getUserResources().get(user1); - assertEquals("local resource set size wrong", 1, lrsrcSet.getSize()); - key1rsrc = lrsrcSet.get(key1, false); - assertNotNull("Local resource doesn't exist but should", key1rsrc); - assertEquals("refcount doesn't match", 0, key1rsrc.getRefCount()); - assertTrue("timestamp not within range", (key1rsrc.getLastAccessTime() >= timeBefore && key1rsrc - .getLastAccessTime() <= timeAfter)); - - // should remove the blob since cache size set really small - localizer.handleCacheCleanup(); - - lrsrcSet = localizer.getUserResources().get(user1); - assertNull("user set should be null", lrsrcSet); - assertFalse("blob not deleted", keyFile.exists()); - assertFalse("blob dir not deleted", new File(expectedFileDir).exists()); - assertFalse("blob dir not deleted", new File(expectedUserDir).exists()); - } - - @Test - public void testMultipleKeysOneUser() throws Exception { - Map<String, Object> conf = new HashMap(); - // set clean time really high so doesn't kick in - conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000); - - String key1 = "key1"; - String topo1 = "topo1"; - String key2 = "key2"; - String key3 = "key3"; - Localizer localizer = new TestLocalizer(conf, baseDir.toString()); - // set to keep 2 blobs (each of size 34) - localizer.setTargetCacheSize(68); - - ReadableBlobMeta rbm = new ReadableBlobMeta(); - rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING)); - when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm); - when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta()); - when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta()); - when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta()); - - List<LocalResource> keys = Arrays.asList(new LocalResource[]{new LocalResource(key1, false), - new LocalResource(key2, false), new LocalResource(key3, false)}); - File user1Dir = localizer.getLocalUserFileCacheDir(user1); - assertTrue("failed to create user dir", user1Dir.mkdirs()); - - List<LocalizedResource> lrsrcs = localizer.getBlobs(keys, user1, topo1, user1Dir); - LocalizedResource lrsrc = lrsrcs.get(0); - LocalizedResource lrsrc2 = lrsrcs.get(1); - LocalizedResource lrsrc3 = lrsrcs.get(2); - - String expectedFileDir = joinPath(baseDir.toString(), USERCACHE, user1, - Localizer.FILECACHE, Localizer.FILESDIR); - assertTrue("user filecache dir not created", new File(expectedFileDir).exists()); - File keyFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - File keyFile2 = new File(expectedFileDir, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - File keyFile3 = new File(expectedFileDir, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - - assertTrue("blob not created", keyFile.exists()); - assertTrue("blob not created", keyFile2.exists()); - assertTrue("blob not created", keyFile3.exists()); - assertEquals("size doesn't match", 34, keyFile.length()); - assertEquals("size doesn't match", 34, keyFile2.length()); - assertEquals("size doesn't match", 34, keyFile3.length()); - assertEquals("size doesn't match", 34, lrsrc.getSize()); - assertEquals("size doesn't match", 34, lrsrc3.getSize()); - assertEquals("size doesn't match", 34, lrsrc2.getSize()); - - LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1); - assertEquals("local resource set size wrong", 3, lrsrcSet.getSize()); - assertEquals("user doesn't match", user1, lrsrcSet.getUser()); - - long timeBefore = System.nanoTime(); - localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false); - localizer.removeBlobReference(lrsrc2.getKey(), user1, topo1, false); - localizer.removeBlobReference(lrsrc3.getKey(), user1, topo1, false); - long timeAfter = System.nanoTime(); - - // add reference to one and then remove reference again so it has newer timestamp - lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir); - assertTrue("timestamp not within range", (lrsrc.getLastAccessTime() >= timeBefore && lrsrc - .getLastAccessTime() <= timeAfter)); - localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false); - - // should remove the second blob first - localizer.handleCacheCleanup(); - - lrsrcSet = localizer.getUserResources().get(user1); - assertEquals("local resource set size wrong", 2, lrsrcSet.getSize()); - long end = System.currentTimeMillis() + 100; - while ((end - System.currentTimeMillis()) >= 0 && keyFile2.exists()) { - Thread.sleep(1); - } - assertFalse("blob not deleted", keyFile2.exists()); - assertTrue("blob deleted", keyFile.exists()); - assertTrue("blob deleted", keyFile3.exists()); - - // set size to cleanup another one - localizer.setTargetCacheSize(34); - - // should remove the third blob - localizer.handleCacheCleanup(); - - lrsrcSet = localizer.getUserResources().get(user1); - assertEquals("local resource set size wrong", 1, lrsrcSet.getSize()); - assertTrue("blob deleted", keyFile.exists()); - assertFalse("blob not deleted", keyFile3.exists()); - } - - @Test(expected = AuthorizationException.class) - public void testFailAcls() throws Exception { - Map<String, Object> conf = new HashMap(); - // set clean time really high so doesn't kick in - conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000); - // enable blobstore acl validation - conf.put(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED, true); - - String topo1 = "topo1"; - String key1 = "key1"; - Localizer localizer = new TestLocalizer(conf, baseDir.toString()); - - ReadableBlobMeta rbm = new ReadableBlobMeta(); - // set acl so user doesn't have read access - AccessControl acl = new AccessControl(AccessControlType.USER, BlobStoreAclHandler.ADMIN); - acl.set_name(user1); - rbm.set_settable(new SettableBlobMeta(Arrays.asList(acl))); - when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm); - when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta()); - File user1Dir = localizer.getLocalUserFileCacheDir(user1); - assertTrue("failed to create user dir", user1Dir.mkdirs()); - - // This should throw AuthorizationException because auth failed - localizer.getBlob(new LocalResource(key1, false), user1, topo1, user1Dir); - } - - @Test(expected = KeyNotFoundException.class) - public void testKeyNotFoundException() throws Exception { - Map<String, Object> conf = Utils.readStormConfig(); - String key1 = "key1"; - conf.put(Config.STORM_LOCAL_DIR, "target"); - LocalFsBlobStore bs = new LocalFsBlobStore(); - LocalFsBlobStore spy = spy(bs); - Mockito.doReturn(true).when(spy).checkForBlobOrDownload(key1); - Mockito.doNothing().when(spy).checkForBlobUpdate(key1); - spy.prepare(conf,null,null); - spy.getBlob(key1, null); - } - - @Test - public void testMultipleUsers() throws Exception { - Map<String, Object> conf = new HashMap(); - // set clean time really high so doesn't kick in - conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000); - - String topo1 = "topo1"; - String topo2 = "topo2"; - String topo3 = "topo3"; - String key1 = "key1"; - String key2 = "key2"; - String key3 = "key3"; - Localizer localizer = new TestLocalizer(conf, baseDir.toString()); - // set to keep 2 blobs (each of size 34) - localizer.setTargetCacheSize(68); - - ReadableBlobMeta rbm = new ReadableBlobMeta(); - rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING)); - when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm); - when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta()); - when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta()); - when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta()); - - File user1Dir = localizer.getLocalUserFileCacheDir(user1); - assertTrue("failed to create user dir", user1Dir.mkdirs()); - File user2Dir = localizer.getLocalUserFileCacheDir(user2); - assertTrue("failed to create user dir", user2Dir.mkdirs()); - File user3Dir = localizer.getLocalUserFileCacheDir(user3); - assertTrue("failed to create user dir", user3Dir.mkdirs()); - - LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1, - user1Dir); - LocalizedResource lrsrc2 = localizer.getBlob(new LocalResource(key2, false), user2, topo2, - user2Dir); - LocalizedResource lrsrc3 = localizer.getBlob(new LocalResource(key3, false), user3, topo3, - user3Dir); - // make sure we support different user reading same blob - LocalizedResource lrsrc1_user3 = localizer.getBlob(new LocalResource(key1, false), user3, - topo3, user3Dir); - - String expectedUserDir1 = joinPath(baseDir.toString(), USERCACHE, user1); - String expectedFileDirUser1 = joinPath(expectedUserDir1, Localizer.FILECACHE, Localizer.FILESDIR); - String expectedFileDirUser2 = joinPath(baseDir.toString(), USERCACHE, user2, - Localizer.FILECACHE, Localizer.FILESDIR); - String expectedFileDirUser3 = joinPath(baseDir.toString(), USERCACHE, user3, - Localizer.FILECACHE, Localizer.FILESDIR); - assertTrue("user filecache dir user1 not created", new File(expectedFileDirUser1).exists()); - assertTrue("user filecache dir user2 not created", new File(expectedFileDirUser2).exists()); - assertTrue("user filecache dir user3 not created", new File(expectedFileDirUser3).exists()); - - File keyFile = new File(expectedFileDirUser1, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - File keyFile2 = new File(expectedFileDirUser2, key2 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - File keyFile3 = new File(expectedFileDirUser3, key3 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - File keyFile1user3 = new File(expectedFileDirUser3, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - - assertTrue("blob not created", keyFile.exists()); - assertTrue("blob not created", keyFile2.exists()); - assertTrue("blob not created", keyFile3.exists()); - assertTrue("blob not created", keyFile1user3.exists()); - - LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1); - assertEquals("local resource set size wrong", 1, lrsrcSet.getSize()); - LocalizedResourceSet lrsrcSet2 = localizer.getUserResources().get(user2); - assertEquals("local resource set size wrong", 1, lrsrcSet2.getSize()); - LocalizedResourceSet lrsrcSet3 = localizer.getUserResources().get(user3); - assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize()); - - localizer.removeBlobReference(lrsrc.getKey(), user1, topo1, false); - // should remove key1 - localizer.handleCacheCleanup(); - - lrsrcSet = localizer.getUserResources().get(user1); - lrsrcSet3 = localizer.getUserResources().get(user3); - assertNull("user set should be null", lrsrcSet); - assertFalse("blob dir not deleted", new File(expectedFileDirUser1).exists()); - assertFalse("blob dir not deleted", new File(expectedUserDir1).exists()); - assertEquals("local resource set size wrong", 2, lrsrcSet3.getSize()); - - assertTrue("blob deleted", keyFile2.exists()); - assertFalse("blob not deleted", keyFile.exists()); - assertTrue("blob deleted", keyFile3.exists()); - assertTrue("blob deleted", keyFile1user3.exists()); - } - - @Test - public void testUpdate() throws Exception { - Map<String, Object> conf = new HashMap(); - // set clean time really high so doesn't kick in - conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60*60*1000); - - String key1 = "key1"; - String topo1 = "topo1"; - String topo2 = "topo2"; - Localizer localizer = new TestLocalizer(conf, baseDir.toString()); - - ReadableBlobMeta rbm = new ReadableBlobMeta(); - rbm.set_version(1); - rbm.set_settable(new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING)); - when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm); - when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta()); - - File user1Dir = localizer.getLocalUserFileCacheDir(user1); - assertTrue("failed to create user dir", user1Dir.mkdirs()); - LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1, - user1Dir); - - String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1); - String expectedFileDir = joinPath(expectedUserDir, Localizer.FILECACHE, Localizer.FILESDIR); - assertTrue("user filecache dir not created", new File(expectedFileDir).exists()); - File keyFile = new File(expectedFileDir, key1); - File keyFileCurrentSymlink = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX); - assertTrue("blob not created", keyFileCurrentSymlink.exists()); - File versionFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_BLOB_VERSION_SUFFIX); - assertTrue("blob version file not created", versionFile.exists()); - assertEquals("blob version not correct", 1, ServerUtils.localVersionOfBlob(keyFile.toString())); - - LocalizedResourceSet lrsrcSet = localizer.getUserResources().get(user1); - assertEquals("local resource set size wrong", 1, lrsrcSet.getSize()); - - // test another topology getting blob with updated version - it should update version now - rbm.set_version(2); - - localizer.getBlob(new LocalResource(key1, false), user1, topo2, user1Dir); - assertTrue("blob version file not created", versionFile.exists()); - assertEquals("blob version not correct", 2, ServerUtils.localVersionOfBlob(keyFile.toString())); - assertTrue("blob file with version 2 not created", new File(keyFile + ".2").exists()); - - // now test regular updateBlob - rbm.set_version(3); - - ArrayList<LocalResource> arr = new ArrayList<LocalResource>(); - arr.add(new LocalResource(key1, false)); - localizer.updateBlobs(arr, user1); - assertTrue("blob version file not created", versionFile.exists()); - assertEquals("blob version not correct", 3, ServerUtils.localVersionOfBlob(keyFile.toString())); - assertTrue("blob file with version 3 not created", new File(keyFile + ".3").exists()); - } -}