http://git-wip-us.apache.org/repos/asf/storm/blob/83adfd9d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java index 6e03bcf..f9374fe 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -33,10 +34,7 @@ import org.apache.storm.security.auth.NimbusPrincipal; import org.apache.storm.security.auth.SingleUserPrincipal; import org.apache.storm.utils.Utils; import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -63,469 +61,454 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.*; +import org.apache.storm.hdfs.testing.MiniDFSClusterRule; +import org.junit.ClassRule; + public class BlobStoreTest { - private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class); - protected static MiniDFSCluster dfscluster = null; - protected static Configuration hadoopConf = null; - URI base; - File baseFile; - private static Map conf = new HashMap(); - public static final int READ = 0x01; - public static final int WRITE = 0x02; - public static final int ADMIN = 0x04; - - @Before - public void init() { - System.setProperty("test.build.data", "target/test/data"); - initializeConfigs(); - baseFile = new File("/tmp/blob-store-test-"+UUID.randomUUID()); - base = baseFile.toURI(); - } - - @After - public void cleanup() - throws IOException { - FileUtils.deleteDirectory(baseFile); - } - - @AfterClass - public static void cleanupAfterClass() throws IOException { - if (dfscluster != null) { - dfscluster.shutdown(); + + @ClassRule + public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule(); + + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class); + URI base; + File baseFile; + private static final Map CONF = new HashMap(); + public static final int READ = 0x01; + public static final int WRITE = 0x02; + public static final int ADMIN = 0x04; + + @Before + public void init() { + initializeConfigs(); + baseFile = new File("/tmp/blob-store-test-" + UUID.randomUUID()); + base = baseFile.toURI(); + } + + @After + public void cleanup() + throws IOException { + FileUtils.deleteDirectory(baseFile); + } + + // Method which initializes nimbus admin + public static void initializeConfigs() { + CONF.put(Config.NIMBUS_ADMINS, "admin"); + CONF.put(Config.NIMBUS_SUPERVISOR_USERS, "supervisor"); + } + + //Gets Nimbus Subject with NimbusPrincipal set on it + public static Subject getNimbusSubject() { + Subject nimbus = new Subject(); + nimbus.getPrincipals().add(new NimbusPrincipal()); + return nimbus; + } + + // Overloading the assertStoreHasExactly method accomodate Subject in order to check for authorization + public static void assertStoreHasExactly(BlobStore store, Subject who, String... keys) + throws IOException, KeyNotFoundException, AuthorizationException { + Set<String> expected = new HashSet<>(Arrays.asList(keys)); + Set<String> found = new HashSet<>(); + Iterator<String> c = store.listKeys(); + while (c.hasNext()) { + String keyName = c.next(); + found.add(keyName); + } + Set<String> extra = new HashSet<>(found); + extra.removeAll(expected); + assertTrue("Found extra keys in the blob store " + extra, extra.isEmpty()); + Set<String> missing = new HashSet<>(expected); + missing.removeAll(found); + assertTrue("Found keys missing from the blob store " + missing, missing.isEmpty()); + } + + public static void assertStoreHasExactly(BlobStore store, String... keys) + throws IOException, KeyNotFoundException, AuthorizationException { + assertStoreHasExactly(store, null, keys); + } + + // Overloading the readInt method accomodate Subject in order to check for authorization (security turned on) + public static int readInt(BlobStore store, Subject who, String key) throws IOException, KeyNotFoundException, AuthorizationException { + try (InputStream in = store.getBlob(key, who)) { + return in.read(); + } + } + + public static int readInt(BlobStore store, String key) + throws IOException, KeyNotFoundException, AuthorizationException { + return readInt(store, null, key); + } + + public static void readAssertEquals(BlobStore store, String key, int value) + throws IOException, KeyNotFoundException, AuthorizationException { + assertEquals(value, readInt(store, key)); } - } - - // Method which initializes nimbus admin - public static void initializeConfigs() { - conf.put(Config.NIMBUS_ADMINS,"admin"); - conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor"); - } - - //Gets Nimbus Subject with NimbusPrincipal set on it - public static Subject getNimbusSubject() { - Subject nimbus = new Subject(); - nimbus.getPrincipals().add(new NimbusPrincipal()); - return nimbus; - } - - // Overloading the assertStoreHasExactly method accomodate Subject in order to check for authorization - public static void assertStoreHasExactly(BlobStore store, Subject who, String ... keys) - throws IOException, KeyNotFoundException, AuthorizationException { - Set<String> expected = new HashSet<String>(Arrays.asList(keys)); - Set<String> found = new HashSet<String>(); - Iterator<String> c = store.listKeys(); - while (c.hasNext()) { - String keyName = c.next(); - found.add(keyName); + + // Checks for assertion when we turn on security + public void readAssertEqualsWithAuth(BlobStore store, Subject who, String key, int value) + throws IOException, KeyNotFoundException, AuthorizationException { + assertEquals(value, readInt(store, who, key)); } - Set<String> extra = new HashSet<String>(found); - extra.removeAll(expected); - assertTrue("Found extra keys in the blob store "+extra, extra.isEmpty()); - Set<String> missing = new HashSet<String>(expected); - missing.removeAll(found); - assertTrue("Found keys missing from the blob store "+missing, missing.isEmpty()); - } - - public static void assertStoreHasExactly(BlobStore store, String ... keys) - throws IOException, KeyNotFoundException, AuthorizationException { - assertStoreHasExactly(store, null, keys); - } - - // Overloading the readInt method accomodate Subject in order to check for authorization (security turned on) - public static int readInt(BlobStore store, Subject who, String key) throws IOException, KeyNotFoundException, AuthorizationException { - InputStream in = store.getBlob(key, who); - try { - return in.read(); - } finally { - in.close(); + + private AutoCloseableBlobStoreContainer initHdfs(String dirName) + throws Exception { + Map conf = new HashMap(); + conf.put(Config.BLOBSTORE_DIR, dirName); + conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal"); + conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3); + HdfsBlobStore store = new HdfsBlobStore(); + store.prepareInternal(conf, null, DFS_CLUSTER_RULE.getDfscluster().getConfiguration(0)); + return new AutoCloseableBlobStoreContainer(store); } - } - - public static int readInt(BlobStore store, String key) - throws IOException, KeyNotFoundException, AuthorizationException { - return readInt(store, null, key); - } - - public static void readAssertEquals(BlobStore store, String key, int value) - throws IOException, KeyNotFoundException, AuthorizationException { - assertEquals(value, readInt(store, key)); - } - - // Checks for assertion when we turn on security - public void readAssertEqualsWithAuth(BlobStore store, Subject who, String key, int value) - throws IOException, KeyNotFoundException, AuthorizationException { - assertEquals(value, readInt(store, who, key)); - } - - private HdfsBlobStore initHdfs(String dirName) - throws Exception { - if (hadoopConf == null) { - hadoopConf = new Configuration(); + + private static class AutoCloseableBlobStoreContainer implements AutoCloseable { + + private final HdfsBlobStore blobStore; + + public AutoCloseableBlobStoreContainer(HdfsBlobStore blobStore) { + this.blobStore = blobStore; + } + + @Override + public void close() throws Exception { + this.blobStore.shutdown(); + } + } - try { - if (dfscluster == null) { - dfscluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(3).build(); - dfscluster.waitActive(); - } - } catch (IOException e) { - LOG.error("error creating MiniDFSCluster"); + + @Test + public void testHdfsReplication() + throws Exception { + try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstoreReplication")) { + testReplication("/storm/blobstoreReplication/test", container.blobStore); + } } - Map conf = new HashMap(); - conf.put(Config.BLOBSTORE_DIR, dirName); - conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"org.apache.storm.security.auth.DefaultPrincipalToLocal"); - conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3); - HdfsBlobStore store = new HdfsBlobStore(); - store.prepareInternal(conf, null, dfscluster.getConfiguration(0)); - return store; - } - - @Test - public void testHdfsReplication() - throws Exception { - BlobStore store = initHdfs("/storm/blobstoreReplication"); - testReplication("/storm/blobstoreReplication/test", store); - } - - @Test - public void testBasicHdfs() - throws Exception { - testBasic(initHdfs("/storm/blobstore1")); - } - - @Test - public void testMultipleHdfs() - throws Exception { - // use different blobstore dir so it doesn't conflict with other test - testMultiple(initHdfs("/storm/blobstore2")); - } - - @Test - public void testHdfsWithAuth() - throws Exception { - // use different blobstore dir so it doesn't conflict with other tests - testWithAuthentication(initHdfs("/storm/blobstore3")); - } - - // Test for replication. - public void testReplication(String path, BlobStore store) - throws Exception { - SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); - metadata.set_replication_factor(4); - AtomicOutputStream out = store.createBlob("test", metadata, null); - out.write(1); - out.close(); - assertStoreHasExactly(store, "test"); - assertEquals("Blobstore replication not matching", store.getBlobReplication("test", null), 4); - store.deleteBlob("test", null); - - //Test for replication with NIMBUS as user - Subject admin = getSubject("admin"); - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - metadata.set_replication_factor(4); - out = store.createBlob("test", metadata, admin); - out.write(1); - out.close(); - assertStoreHasExactly(store, "test"); - assertEquals("Blobstore replication not matching", store.getBlobReplication("test", admin), 4); - store.updateBlobReplication("test", 5, admin); - assertEquals("Blobstore replication not matching", store.getBlobReplication("test", admin), 5); - store.deleteBlob("test", admin); - - //Test for replication using SUPERVISOR access - Subject supervisor = getSubject("supervisor"); - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - metadata.set_replication_factor(4); - out = store.createBlob("test", metadata, supervisor); - out.write(1); - out.close(); - assertStoreHasExactly(store, "test"); - assertEquals("Blobstore replication not matching", store.getBlobReplication("test", supervisor), 4); - store.updateBlobReplication("test", 5, supervisor); - assertEquals("Blobstore replication not matching", store.getBlobReplication("test", supervisor), 5); - store.deleteBlob("test", supervisor); - - //Test for a user having read or write or admin access to read replication for a blob - String createSubject = "createSubject"; - String writeSubject = "writeSubject"; - String adminSubject = "adminSubject"; - Subject who = getSubject(createSubject); - AccessControl writeAccess = new AccessControl(AccessControlType.USER, READ); - AccessControl adminAccess = new AccessControl(AccessControlType.USER, ADMIN); - writeAccess.set_name(writeSubject); - adminAccess.set_name(adminSubject); - List<AccessControl> acl = Arrays.asList(writeAccess, adminAccess); - metadata = new SettableBlobMeta(acl); - metadata.set_replication_factor(4); - out = store.createBlob("test", metadata, who); - out.write(1); - out.close(); - assertStoreHasExactly(store, "test"); - who = getSubject(writeSubject); - assertEquals("Blobstore replication not matching", store.getBlobReplication("test", who), 4); - - //Test for a user having WRITE or ADMIN privileges to change replication of a blob - who = getSubject(adminSubject); - store.updateBlobReplication("test", 5, who); - assertEquals("Blobstore replication not matching", store.getBlobReplication("test", who), 5); - store.deleteBlob("test", getSubject(createSubject)); - } - - public Subject getSubject(String name) { - Subject subject = new Subject(); - SingleUserPrincipal user = new SingleUserPrincipal(name); - subject.getPrincipals().add(user); - return subject; - } - - // Check for Blobstore with authentication - public void testWithAuthentication(BlobStore store) - throws Exception { - //Test for Nimbus Admin - Subject admin = getSubject("admin"); - assertStoreHasExactly(store); - SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - AtomicOutputStream out = store.createBlob("test", metadata, admin); - assertStoreHasExactly(store, "test"); - out.write(1); - out.close(); - store.deleteBlob("test", admin); - - //Test for Supervisor Admin - Subject supervisor = getSubject("supervisor"); - assertStoreHasExactly(store); - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - out = store.createBlob("test", metadata, supervisor); - assertStoreHasExactly(store, "test"); - out.write(1); - out.close(); - store.deleteBlob("test", supervisor); - - //Test for Nimbus itself as a user - Subject nimbus = getNimbusSubject(); - assertStoreHasExactly(store); - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - out = store.createBlob("test", metadata, nimbus); - assertStoreHasExactly(store, "test"); - out.write(1); - out.close(); - store.deleteBlob("test", nimbus); - - // Test with a dummy test_subject for cases where subject !=null (security turned on) - Subject who = getSubject("test_subject"); - assertStoreHasExactly(store); - - // Tests for case when subject != null (security turned on) and - // acls for the blob are set to WORLD_EVERYTHING - metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); - out = store.createBlob("test", metadata, who); - out.write(1); - out.close(); - assertStoreHasExactly(store, "test"); - // Testing whether acls are set to WORLD_EVERYTHING - assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); - readAssertEqualsWithAuth(store, who, "test", 1); - - LOG.info("Deleting test"); - store.deleteBlob("test", who); - assertStoreHasExactly(store); - - // Tests for case when subject != null (security turned on) and - // acls are not set for the blob (DEFAULT) - LOG.info("Creating test again"); - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - out = store.createBlob("test", metadata, who); - out.write(2); - out.close(); - assertStoreHasExactly(store, "test"); - // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because - // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have - // complete access to the blob - assertTrue("ACL does not contain WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)")); - readAssertEqualsWithAuth(store, who, "test", 2); - - LOG.info("Updating test"); - out = store.updateBlob("test", who); - out.write(3); - out.close(); - assertStoreHasExactly(store, "test"); - readAssertEqualsWithAuth(store, who, "test", 3); - - LOG.info("Updating test again"); - out = store.updateBlob("test", who); - out.write(4); - out.flush(); - LOG.info("SLEEPING"); - Thread.sleep(2); - assertStoreHasExactly(store, "test"); - readAssertEqualsWithAuth(store, who, "test", 3); - - //Test for subject with no principals and acls set to WORLD_EVERYTHING - who = new Subject(); - metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); - LOG.info("Creating test"); - out = store.createBlob("test-empty-subject-WE", metadata, who); - out.write(2); - out.close(); - assertStoreHasExactly(store, "test-empty-subject-WE", "test"); - // Testing whether acls are set to WORLD_EVERYTHING - assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); - readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2); - - //Test for subject with no principals and acls set to DEFAULT - who = new Subject(); - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - LOG.info("Creating other"); - out = store.createBlob("test-empty-subject-DEF", metadata, who); - out.write(2); - out.close(); - assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE"); - // Testing whether acls are set to WORLD_EVERYTHING - assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); - readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2); - - if (store instanceof HdfsBlobStore) { - ((HdfsBlobStore) store).fullCleanup(1); - } else { - fail("Error the blobstore is of unknowntype"); + + @Test + public void testBasicHdfs() + throws Exception { + try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore1")) { + testBasic(container.blobStore); + } } - try { - out.close(); - } catch (IOException e) { - //This is likely to happen when we try to commit something that - // was cleaned up. This is expected and acceptable. + + @Test + public void testMultipleHdfs() + throws Exception { + // use different blobstore dir so it doesn't conflict with other test + try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore2")) { + testMultiple(container.blobStore); + } } - } - - public void testBasic(BlobStore store) - throws Exception { - assertStoreHasExactly(store); - LOG.info("Creating test"); - // Tests for case when subject == null (security turned off) and - // acls for the blob are set to WORLD_EVERYTHING - SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler - .WORLD_EVERYTHING); - AtomicOutputStream out = store.createBlob("test", metadata, null); - out.write(1); - out.close(); - assertStoreHasExactly(store, "test"); - // Testing whether acls are set to WORLD_EVERYTHING - assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); - readAssertEquals(store, "test", 1); - - LOG.info("Deleting test"); - store.deleteBlob("test", null); - assertStoreHasExactly(store); - - // The following tests are run for both hdfs and local store to test the - // update blob interface - metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); - LOG.info("Creating test again"); - out = store.createBlob("test", metadata, null); - out.write(2); - out.close(); - assertStoreHasExactly(store, "test"); - readAssertEquals(store, "test", 2); - LOG.info("Updating test"); - out = store.updateBlob("test", null); - out.write(3); - out.close(); - assertStoreHasExactly(store, "test"); - readAssertEquals(store, "test", 3); - - LOG.info("Updating test again"); - out = store.updateBlob("test", null); - out.write(4); - out.flush(); - LOG.info("SLEEPING"); - Thread.sleep(2); - - if (store instanceof HdfsBlobStore) { - ((HdfsBlobStore) store).fullCleanup(1); - } else { - fail("Error the blobstore is of unknowntype"); + + @Test + public void testHdfsWithAuth() + throws Exception { + // use different blobstore dir so it doesn't conflict with other tests + try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore3")) { + testWithAuthentication(container.blobStore); + } } - try { - out.close(); - } catch (IOException e) { - //This is likely to happen when we try to commit something that - // was cleaned up. This is expected and acceptable. + + // Test for replication. + public void testReplication(String path, BlobStore store) + throws Exception { + SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); + metadata.set_replication_factor(4); + try (AtomicOutputStream out = store.createBlob("test", metadata, null)) { + out.write(1); + } + assertStoreHasExactly(store, "test"); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", null), 4); + store.deleteBlob("test", null); + + //Test for replication with NIMBUS as user + Subject admin = getSubject("admin"); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + metadata.set_replication_factor(4); + try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) { + out.write(1); + } + assertStoreHasExactly(store, "test"); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", admin), 4); + store.updateBlobReplication("test", 5, admin); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", admin), 5); + store.deleteBlob("test", admin); + + //Test for replication using SUPERVISOR access + Subject supervisor = getSubject("supervisor"); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + metadata.set_replication_factor(4); + try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) { + out.write(1); + } + assertStoreHasExactly(store, "test"); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", supervisor), 4); + store.updateBlobReplication("test", 5, supervisor); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", supervisor), 5); + store.deleteBlob("test", supervisor); + + //Test for a user having read or write or admin access to read replication for a blob + String createSubject = "createSubject"; + String writeSubject = "writeSubject"; + String adminSubject = "adminSubject"; + Subject who = getSubject(createSubject); + AccessControl writeAccess = new AccessControl(AccessControlType.USER, READ); + AccessControl adminAccess = new AccessControl(AccessControlType.USER, ADMIN); + writeAccess.set_name(writeSubject); + adminAccess.set_name(adminSubject); + List<AccessControl> acl = Arrays.asList(writeAccess, adminAccess); + metadata = new SettableBlobMeta(acl); + metadata.set_replication_factor(4); + try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { + out.write(1); + } + assertStoreHasExactly(store, "test"); + who = getSubject(writeSubject); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", who), 4); + + //Test for a user having WRITE or ADMIN privileges to change replication of a blob + who = getSubject(adminSubject); + store.updateBlobReplication("test", 5, who); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", who), 5); + store.deleteBlob("test", getSubject(createSubject)); } - } - - - public void testMultiple(BlobStore store) - throws Exception { - assertStoreHasExactly(store); - LOG.info("Creating test"); - AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler - .WORLD_EVERYTHING), null); - out.write(1); - out.close(); - assertStoreHasExactly(store, "test"); - readAssertEquals(store, "test", 1); - - LOG.info("Creating other"); - out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), - null); - out.write(2); - out.close(); - assertStoreHasExactly(store, "test", "other"); - readAssertEquals(store, "test", 1); - readAssertEquals(store, "other", 2); - - LOG.info("Updating other"); - out = store.updateBlob("other", null); - out.write(5); - out.close(); - assertStoreHasExactly(store, "test", "other"); - readAssertEquals(store, "test", 1); - readAssertEquals(store, "other", 5); - - LOG.info("Deleting test"); - store.deleteBlob("test", null); - assertStoreHasExactly(store, "other"); - readAssertEquals(store, "other", 5); - - LOG.info("Creating test again"); - out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), - null); - out.write(2); - out.close(); - assertStoreHasExactly(store, "test", "other"); - readAssertEquals(store, "test", 2); - readAssertEquals(store, "other", 5); - - LOG.info("Updating test"); - out = store.updateBlob("test", null); - out.write(3); - out.close(); - assertStoreHasExactly(store, "test", "other"); - readAssertEquals(store, "test", 3); - readAssertEquals(store, "other", 5); - - LOG.info("Deleting other"); - store.deleteBlob("other", null); - assertStoreHasExactly(store, "test"); - readAssertEquals(store, "test", 3); - - LOG.info("Updating test again"); - out = store.updateBlob("test", null); - out.write(4); - out.flush(); - LOG.info("SLEEPING"); - Thread.sleep(2); - - if (store instanceof HdfsBlobStore) { - ((HdfsBlobStore) store).fullCleanup(1); - } else { - fail("Error the blobstore is of unknowntype"); - } assertStoreHasExactly(store, "test"); - readAssertEquals(store, "test", 3); - try { - out.close(); - } catch (IOException e) { - //This is likely to happen when we try to commit something that - // was cleaned up. This is expected and acceptable. + + public Subject getSubject(String name) { + Subject subject = new Subject(); + SingleUserPrincipal user = new SingleUserPrincipal(name); + subject.getPrincipals().add(user); + return subject; + } + + // Check for Blobstore with authentication + public void testWithAuthentication(BlobStore store) + throws Exception { + //Test for Nimbus Admin + Subject admin = getSubject("admin"); + assertStoreHasExactly(store); + SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) { + assertStoreHasExactly(store, "test"); + out.write(1); + } + store.deleteBlob("test", admin); + + //Test for Supervisor Admin + Subject supervisor = getSubject("supervisor"); + assertStoreHasExactly(store); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) { + assertStoreHasExactly(store, "test"); + out.write(1); + } + store.deleteBlob("test", supervisor); + + //Test for Nimbus itself as a user + Subject nimbus = getNimbusSubject(); + assertStoreHasExactly(store); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + try (AtomicOutputStream out = store.createBlob("test", metadata, nimbus)) { + assertStoreHasExactly(store, "test"); + out.write(1); + } + store.deleteBlob("test", nimbus); + + // Test with a dummy test_subject for cases where subject !=null (security turned on) + Subject who = getSubject("test_subject"); + assertStoreHasExactly(store); + + // Tests for case when subject != null (security turned on) and + // acls for the blob are set to WORLD_EVERYTHING + metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); + try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { + out.write(1); + } + assertStoreHasExactly(store, "test"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test", 1); + + LOG.info("Deleting test"); + store.deleteBlob("test", who); + assertStoreHasExactly(store); + + // Tests for case when subject != null (security turned on) and + // acls are not set for the blob (DEFAULT) + LOG.info("Creating test again"); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { + out.write(2); + } + assertStoreHasExactly(store, "test"); + // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because + // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have + // complete access to the blob + assertTrue("ACL does not contain WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test", 2); + + LOG.info("Updating test"); + try (AtomicOutputStream out = store.updateBlob("test", who)) { + out.write(3); + } + assertStoreHasExactly(store, "test"); + readAssertEqualsWithAuth(store, who, "test", 3); + + LOG.info("Updating test again"); + try (AtomicOutputStream out = store.updateBlob("test", who)) { + out.write(4); + } + LOG.info("SLEEPING"); + Thread.sleep(2); + assertStoreHasExactly(store, "test"); + readAssertEqualsWithAuth(store, who, "test", 3); + + //Test for subject with no principals and acls set to WORLD_EVERYTHING + who = new Subject(); + metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); + LOG.info("Creating test"); + try (AtomicOutputStream out = store.createBlob("test-empty-subject-WE", metadata, who)) { + out.write(2); + } + assertStoreHasExactly(store, "test-empty-subject-WE", "test"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2); + + //Test for subject with no principals and acls set to DEFAULT + who = new Subject(); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + LOG.info("Creating other"); + try (AtomicOutputStream out = store.createBlob("test-empty-subject-DEF", metadata, who)) { + out.write(2); + } + assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2); + + if (store instanceof HdfsBlobStore) { + ((HdfsBlobStore) store).fullCleanup(1); + } else { + fail("Error the blobstore is of unknowntype"); + } + } + + public void testBasic(BlobStore store) + throws Exception { + assertStoreHasExactly(store); + LOG.info("Creating test"); + // Tests for case when subject == null (security turned off) and + // acls for the blob are set to WORLD_EVERYTHING + SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); + try (AtomicOutputStream out = store.createBlob("test", metadata, null)) { + out.write(1); + } + assertStoreHasExactly(store, "test"); + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + readAssertEquals(store, "test", 1); + + LOG.info("Deleting test"); + store.deleteBlob("test", null); + assertStoreHasExactly(store); + + // The following tests are run for both hdfs and local store to test the + // update blob interface + metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); + LOG.info("Creating test again"); + try (AtomicOutputStream out = store.createBlob("test", metadata, null)) { + out.write(2); + } + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 2); + LOG.info("Updating test"); + try (AtomicOutputStream out = store.updateBlob("test", null)) { + out.write(3); + } + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 3); + + LOG.info("Updating test again"); + try (AtomicOutputStream out = store.updateBlob("test", null)) { + out.write(4); + } + LOG.info("SLEEPING"); + Thread.sleep(2); + + if (store instanceof HdfsBlobStore) { + ((HdfsBlobStore) store).fullCleanup(1); + } else { + fail("Error the blobstore is of unknowntype"); + } + } + + public void testMultiple(BlobStore store) + throws Exception { + assertStoreHasExactly(store); + LOG.info("Creating test"); + try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), null)) { + out.write(1); + } + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 1); + + LOG.info("Creating other"); + try (AtomicOutputStream out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), + null)) { + out.write(2); + } + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 1); + readAssertEquals(store, "other", 2); + + LOG.info("Updating other"); + try (AtomicOutputStream out = store.updateBlob("other", null)) { + out.write(5); + } + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 1); + readAssertEquals(store, "other", 5); + + LOG.info("Deleting test"); + store.deleteBlob("test", null); + assertStoreHasExactly(store, "other"); + readAssertEquals(store, "other", 5); + + LOG.info("Creating test again"); + try (AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), + null)) { + out.write(2); + } + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 2); + readAssertEquals(store, "other", 5); + + LOG.info("Updating test"); + try (AtomicOutputStream out = store.updateBlob("test", null)) { + out.write(3); + } + assertStoreHasExactly(store, "test", "other"); + readAssertEquals(store, "test", 3); + readAssertEquals(store, "other", 5); + + LOG.info("Deleting other"); + store.deleteBlob("other", null); + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 3); + + LOG.info("Updating test again"); + try (AtomicOutputStream out = store.updateBlob("test", null)) { + out.write(4); + } + LOG.info("SLEEPING"); + Thread.sleep(2); + + if (store instanceof HdfsBlobStore) { + ((HdfsBlobStore) store).fullCleanup(1); + } else { + fail("Error the blobstore is of unknowntype"); + } + assertStoreHasExactly(store, "test"); + readAssertEquals(store, "test", 3); } - } }
http://git-wip-us.apache.org/repos/asf/storm/blob/83adfd9d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java index c49c44b..fd80631 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -23,9 +24,6 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,194 +37,190 @@ import java.util.Map; import static org.junit.Assert.*; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import org.apache.storm.hdfs.testing.MiniDFSClusterRule; +import org.junit.ClassRule; + public class HdfsBlobStoreImplTest { + + @ClassRule + public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule(); + private static final Logger LOG = LoggerFactory.getLogger(HdfsBlobStoreImplTest.class); - protected static Configuration hadoopConf; - protected static MiniDFSCluster dfscluster; // key dir needs to be number 0 to number of buckets, choose one so we know where to look private static String KEYDIR = "0"; private Path blobDir = new Path("/storm/blobstore1"); private Path fullKeyDir = new Path(blobDir, KEYDIR); private String BLOBSTORE_DATA = "data"; - public class TestHdfsBlobStoreImpl extends HdfsBlobStoreImpl { + public class TestHdfsBlobStoreImpl extends HdfsBlobStoreImpl implements AutoCloseable { public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf) throws IOException { super(path, conf); } public TestHdfsBlobStoreImpl(Path path, Map<String, Object> conf, - Configuration hconf) throws IOException { + Configuration hconf) throws IOException { super(path, conf, hconf); } protected Path getKeyDir(String key) { return new Path(new Path(blobDir, KEYDIR), key); } - } - - @BeforeClass - public static void init() { - System.setProperty("test.build.data", "target/test/data"); - if (hadoopConf == null) { - hadoopConf = new Configuration(); - } - try { - if (dfscluster == null) { - dfscluster = new MiniDFSCluster.Builder(hadoopConf).build(); - dfscluster.waitActive(); - } - } catch (IOException e) { - LOG.error("error creating MiniDFSCluster"); - } - } - @AfterClass - public static void cleanup() throws IOException { - if (dfscluster != null) { - dfscluster.shutdown(); + @Override + public void close() throws Exception { + this.shutdown(); } } // Be careful about adding additional tests as the dfscluster will be shared - @Test public void testMultiple() throws Exception { String testString = "testingblob"; String validKey = "validkeyBasic"; - FileSystem fs = dfscluster.getFileSystem(); + //Will be closed automatically when shutting down the DFS cluster + FileSystem fs = DFS_CLUSTER_RULE.getDfscluster().getFileSystem(); Map conf = new HashMap(); - TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, hadoopConf); - // should have created blobDir - assertTrue("BlobStore dir wasn't created", fs.exists(blobDir)); - assertEquals("BlobStore dir was created with wrong permissions", + try (TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, DFS_CLUSTER_RULE.getHadoopConf())) { + // should have created blobDir + assertTrue("BlobStore dir wasn't created", fs.exists(blobDir)); + assertEquals("BlobStore dir was created with wrong permissions", HdfsBlobStoreImpl.BLOBSTORE_DIR_PERMISSION, fs.getFileStatus(blobDir).getPermission()); - // test exist with non-existent key - assertFalse("file exists but shouldn't", hbs.exists("bogus")); - - // test write - BlobStoreFile pfile = hbs.write(validKey, false); - // Adding metadata to avoid null pointer exception - SettableBlobMeta meta = new SettableBlobMeta(); - meta.set_replication_factor(1); - pfile.setMetadata(meta); - OutputStream ios = pfile.getOutputStream(); - ios.write(testString.getBytes(Charset.forName("UTF-8"))); - ios.close(); - - // test commit creates properly - assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir)); - pfile.commit(); - Path dataFile = new Path(new Path(fullKeyDir, validKey), BLOBSTORE_DATA); - assertTrue("blob data not committed", fs.exists(dataFile)); - assertEquals("BlobStore dir was created with wrong permissions", + // test exist with non-existent key + assertFalse("file exists but shouldn't", hbs.exists("bogus")); + + // test write + BlobStoreFile pfile = hbs.write(validKey, false); + // Adding metadata to avoid null pointer exception + SettableBlobMeta meta = new SettableBlobMeta(); + meta.set_replication_factor(1); + pfile.setMetadata(meta); + try (OutputStream ios = pfile.getOutputStream()) { + ios.write(testString.getBytes(StandardCharsets.UTF_8)); + } + + // test commit creates properly + assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir)); + pfile.commit(); + Path dataFile = new Path(new Path(fullKeyDir, validKey), BLOBSTORE_DATA); + assertTrue("blob data not committed", fs.exists(dataFile)); + assertEquals("BlobStore dir was created with wrong permissions", HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile).getPermission()); - assertTrue("key doesn't exist but should", hbs.exists(validKey)); - - // test read - BlobStoreFile readpFile = hbs.read(validKey); - String readString = IOUtils.toString(readpFile.getInputStream(), "UTF-8"); - assertEquals("string read from blob doesn't match", testString, readString); - - // test listkeys - Iterator<String> keys = hbs.listKeys(); - assertTrue("blob has one key", keys.hasNext()); - assertEquals("one key in blobstore", validKey, keys.next()); - - // delete - hbs.deleteKey(validKey); - assertFalse("key not deleted", fs.exists(dataFile)); - assertFalse("key not deleted", hbs.exists(validKey)); - - // Now do multiple - String testString2 = "testingblob2"; - String validKey2= "validkey2"; - - // test write - pfile = hbs.write(validKey, false); - pfile.setMetadata(meta); - ios = pfile.getOutputStream(); - ios.write(testString.getBytes(Charset.forName("UTF-8"))); - ios.close(); - - // test commit creates properly - assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir)); - pfile.commit(); - assertTrue("blob data not committed", fs.exists(dataFile)); - assertEquals("BlobStore dir was created with wrong permissions", + assertTrue("key doesn't exist but should", hbs.exists(validKey)); + + // test read + BlobStoreFile readpFile = hbs.read(validKey); + try (InputStream inStream = readpFile.getInputStream()) { + String readString = IOUtils.toString(inStream, StandardCharsets.UTF_8); + assertEquals("string read from blob doesn't match", testString, readString); + } + + // test listkeys + Iterator<String> keys = hbs.listKeys(); + assertTrue("blob has one key", keys.hasNext()); + assertEquals("one key in blobstore", validKey, keys.next()); + + // delete + hbs.deleteKey(validKey); + assertFalse("key not deleted", fs.exists(dataFile)); + assertFalse("key not deleted", hbs.exists(validKey)); + + // Now do multiple + String testString2 = "testingblob2"; + String validKey2 = "validkey2"; + + // test write + pfile = hbs.write(validKey, false); + pfile.setMetadata(meta); + try (OutputStream ios = pfile.getOutputStream()) { + ios.write(testString.getBytes(StandardCharsets.UTF_8)); + } + + // test commit creates properly + assertTrue("BlobStore key dir wasn't created", fs.exists(fullKeyDir)); + pfile.commit(); + assertTrue("blob data not committed", fs.exists(dataFile)); + assertEquals("BlobStore dir was created with wrong permissions", HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile).getPermission()); - assertTrue("key doesn't exist but should", hbs.exists(validKey)); - - // test write again - pfile = hbs.write(validKey2, false); - pfile.setMetadata(meta); - OutputStream ios2 = pfile.getOutputStream(); - ios2.write(testString2.getBytes(Charset.forName("UTF-8"))); - ios2.close(); - - // test commit second creates properly - pfile.commit(); - Path dataFile2 = new Path(new Path(fullKeyDir, validKey2), BLOBSTORE_DATA); - assertTrue("blob data not committed", fs.exists(dataFile2)); - assertEquals("BlobStore dir was created with wrong permissions", + assertTrue("key doesn't exist but should", hbs.exists(validKey)); + + // test write again + pfile = hbs.write(validKey2, false); + pfile.setMetadata(meta); + try (OutputStream ios2 = pfile.getOutputStream()) { + ios2.write(testString2.getBytes(StandardCharsets.UTF_8)); + } + + // test commit second creates properly + pfile.commit(); + Path dataFile2 = new Path(new Path(fullKeyDir, validKey2), BLOBSTORE_DATA); + assertTrue("blob data not committed", fs.exists(dataFile2)); + assertEquals("BlobStore dir was created with wrong permissions", HdfsBlobStoreFile.BLOBSTORE_FILE_PERMISSION, fs.getFileStatus(dataFile2).getPermission()); - assertTrue("key doesn't exist but should", hbs.exists(validKey2)); - - // test listkeys - keys = hbs.listKeys(); - int total = 0; - boolean key1Found = false; - boolean key2Found = false; - while(keys.hasNext()) { - total++; - String key = keys.next(); - if (key.equals(validKey)) { - key1Found = true; - } else if (key.equals(validKey2)) { - key2Found = true; - } else { - fail("Found key that wasn't expected: " + key); + assertTrue("key doesn't exist but should", hbs.exists(validKey2)); + + // test listkeys + keys = hbs.listKeys(); + int total = 0; + boolean key1Found = false; + boolean key2Found = false; + while (keys.hasNext()) { + total++; + String key = keys.next(); + if (key.equals(validKey)) { + key1Found = true; + } else if (key.equals(validKey2)) { + key2Found = true; + } else { + fail("Found key that wasn't expected: " + key); + } } + assertEquals("number of keys is wrong", 2, total); + assertTrue("blobstore missing key1", key1Found); + assertTrue("blobstore missing key2", key2Found); + + // test read + readpFile = hbs.read(validKey); + try (InputStream inStream = readpFile.getInputStream()) { + String readString = IOUtils.toString(inStream, StandardCharsets.UTF_8); + assertEquals("string read from blob doesn't match", testString, readString); + } + + // test read + readpFile = hbs.read(validKey2); + try (InputStream inStream = readpFile.getInputStream()) { + String readString = IOUtils.toString(inStream, StandardCharsets.UTF_8); + assertEquals("string read from blob doesn't match", testString2, readString); + } + + hbs.deleteKey(validKey); + assertFalse("key not deleted", hbs.exists(validKey)); + hbs.deleteKey(validKey2); + assertFalse("key not deleted", hbs.exists(validKey2)); } - assertEquals("number of keys is wrong", 2, total); - assertTrue("blobstore missing key1", key1Found); - assertTrue("blobstore missing key2", key2Found); - - // test read - readpFile = hbs.read(validKey); - readString = IOUtils.toString(readpFile.getInputStream(), "UTF-8"); - assertEquals("string read from blob doesn't match", testString, readString); - - // test read - readpFile = hbs.read(validKey2); - readString = IOUtils.toString(readpFile.getInputStream(), "UTF-8"); - assertEquals("string read from blob doesn't match", testString2, readString); - - hbs.deleteKey(validKey); - assertFalse("key not deleted", hbs.exists(validKey)); - hbs.deleteKey(validKey2); - assertFalse("key not deleted", hbs.exists(validKey2)); } @Test - public void testGetFileLength() throws IOException { - FileSystem fs = dfscluster.getFileSystem(); - Map conf = new HashMap(); + public void testGetFileLength() throws Exception { + Map<String, Object> conf = new HashMap<>(); String validKey = "validkeyBasic"; String testString = "testingblob"; - TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, hadoopConf); - BlobStoreFile pfile = hbs.write(validKey, false); - // Adding metadata to avoid null pointer exception - SettableBlobMeta meta = new SettableBlobMeta(); - meta.set_replication_factor(1); - pfile.setMetadata(meta); - OutputStream ios = pfile.getOutputStream(); - ios.write(testString.getBytes(Charset.forName("UTF-8"))); - ios.close(); - assertEquals(testString.getBytes(Charset.forName("UTF-8")).length, pfile.getFileLength()); + try (TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, DFS_CLUSTER_RULE.getHadoopConf())) { + BlobStoreFile pfile = hbs.write(validKey, false); + // Adding metadata to avoid null pointer exception + SettableBlobMeta meta = new SettableBlobMeta(); + meta.set_replication_factor(1); + pfile.setMetadata(meta); + try (OutputStream ios = pfile.getOutputStream()) { + ios.write(testString.getBytes(StandardCharsets.UTF_8)); + } + assertEquals(testString.getBytes(StandardCharsets.UTF_8).length, pfile.getFileLength()); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/83adfd9d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java index cd828da..0b08e1b 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -40,49 +41,71 @@ import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import org.junit.Before; import org.junit.After; -import org.junit.Ignore; import org.junit.Test; import org.junit.Assert; import org.mockito.Mock; -import org.mockito.MockitoAnnotations; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.fs.FSDataInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.HashMap; - +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.storm.hdfs.testing.MiniDFSClusterRule; +import org.apache.storm.hdfs.testing.MiniDFSClusterRule.Java7Supplier; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) public class AvroGenericRecordBoltTest { - private String hdfsURI; + @Rule + public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(new Java7Supplier<Configuration>() { + @Override + public Configuration get() { + Configuration conf = new Configuration(); + conf.set("fs.trash.interval", "10"); + conf.setBoolean("dfs.permissions", true); + File baseDir = new File("./target/hdfs/").getAbsoluteFile(); + FileUtil.fullyDelete(baseDir); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + return conf; + } + }); + @Mock + private OutputCollector collector; + @Mock + private TopologyContext topologyContext; + private DistributedFileSystem fs; - private MiniDFSCluster hdfsCluster; + private String hdfsURI; private static final String testRoot = "/unittest"; - private static final Schema schema1; - private static final Schema schema2; - private static final Tuple tuple1; - private static final Tuple tuple2; - private static final String schemaV1 = "{\"type\":\"record\"," + - "\"name\":\"myrecord\"," + - "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + - "{ \"name\":\"int1\", \"type\":\"int\" }]}"; - - private static final String schemaV2 = "{\"type\":\"record\"," + - "\"name\":\"myrecord\"," + - "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + - "{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" }," + - "{ \"name\":\"int1\", \"type\":\"int\" }]}"; - - static { - + private static Schema schema1; + private static Schema schema2; + private static Tuple tuple1; + private static Tuple tuple2; + private static final String schemaV1 = "{\"type\":\"record\"," + + "\"name\":\"myrecord\"," + + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; + + private static final String schemaV2 = "{\"type\":\"record\"," + + "\"name\":\"myrecord\"," + + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + + "{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" }," + + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; + + @BeforeClass + public static void setupClass() { Schema.Parser parser = new Schema.Parser(); schema1 = parser.parse(schemaV1); @@ -100,33 +123,19 @@ public class AvroGenericRecordBoltTest { tuple2 = generateTestTuple(builder2.build()); } - @Mock private OutputCollector collector; - @Mock private TopologyContext topologyContext; - @Before public void setup() throws Exception { - MockitoAnnotations.initMocks(this); - Configuration conf = new Configuration(); - conf.set("fs.trash.interval", "10"); - conf.setBoolean("dfs.permissions", true); - File baseDir = new File("./target/hdfs/").getAbsoluteFile(); - FileUtil.fullyDelete(baseDir); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); - - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); - hdfsCluster = builder.build(); - fs = hdfsCluster.getFileSystem(); + fs = dfsClusterRule.getDfscluster().getFileSystem(); hdfsURI = fs.getUri() + "/"; } @After public void shutDown() throws IOException { fs.close(); - hdfsCluster.shutdown(); } - @Test public void multipleTuplesOneFile() throws IOException - { + @Test + public void multipleTuplesOneFile() throws IOException { AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1f, schemaV1); bolt.prepare(new Config(), topologyContext, collector); @@ -139,8 +148,8 @@ public class AvroGenericRecordBoltTest { verifyAllAvroFiles(testRoot); } - @Test public void multipleTuplesMutliplesFiles() throws IOException - { + @Test + public void multipleTuplesMutliplesFiles() throws IOException { AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, .0001f, schemaV1); bolt.prepare(new Config(), topologyContext, collector); @@ -153,8 +162,8 @@ public class AvroGenericRecordBoltTest { verifyAllAvroFiles(testRoot); } - @Test public void forwardSchemaChangeWorks() throws IOException - { + @Test + public void forwardSchemaChangeWorks() throws IOException { AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV1); bolt.prepare(new Config(), topologyContext, collector); @@ -167,8 +176,8 @@ public class AvroGenericRecordBoltTest { verifyAllAvroFiles(testRoot); } - @Test public void backwardSchemaChangeWorks() throws IOException - { + @Test + public void backwardSchemaChangeWorks() throws IOException { AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV2); bolt.prepare(new Config(), topologyContext, collector); @@ -180,8 +189,8 @@ public class AvroGenericRecordBoltTest { verifyAllAvroFiles(testRoot); } - @Test public void schemaThrashing() throws IOException - { + @Test + public void schemaThrashing() throws IOException { AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV2); bolt.prepare(new Config(), topologyContext, collector); @@ -206,19 +215,19 @@ public class AvroGenericRecordBoltTest { FileNameFormat fieldsFileNameFormat = new DefaultFileNameFormat().withPath(testRoot); FileRotationPolicy rotationPolicy = - new FileSizeRotationPolicy(rotationSizeMB, FileSizeRotationPolicy.Units.MB); + new FileSizeRotationPolicy(rotationSizeMB, FileSizeRotationPolicy.Units.MB); return new AvroGenericRecordBolt() - .withFsUrl(nameNodeAddr) - .withFileNameFormat(fieldsFileNameFormat) - .withRotationPolicy(rotationPolicy) - .withSyncPolicy(fieldsSyncPolicy); + .withFsUrl(nameNodeAddr) + .withFileNameFormat(fieldsFileNameFormat) + .withRotationPolicy(rotationPolicy) + .withSyncPolicy(fieldsSyncPolicy); } private static Tuple generateTestTuple(GenericRecord record) { TopologyBuilder builder = new TopologyBuilder(); GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), - new Config(), new HashMap(), new HashMap(), new HashMap(), "") { + new Config(), new HashMap(), new HashMap(), new HashMap(), "") { @Override public Fields getComponentOutputFields(String componentId, String streamId) { return new Fields("record"); @@ -250,24 +259,23 @@ public class AvroGenericRecordBoltTest { return nonZero; } - private void fileIsGoodAvro (Path path) throws IOException { + private void fileIsGoodAvro(Path path) throws IOException { DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); - FSDataInputStream in = fs.open(path, 0); - FileOutputStream out = new FileOutputStream("target/FOO.avro"); - - byte[] buffer = new byte[100]; - int bytesRead; - while ((bytesRead = in.read(buffer)) > 0) { - out.write(buffer, 0, bytesRead); + try (FSDataInputStream in = fs.open(path, 0); FileOutputStream out = new FileOutputStream("target/FOO.avro")) { + byte[] buffer = new byte[100]; + int bytesRead; + while ((bytesRead = in.read(buffer)) > 0) { + out.write(buffer, 0, bytesRead); + } } - out.close(); java.io.File file = new File("target/FOO.avro"); - DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file, datumReader); - GenericRecord user = null; - while (dataFileReader.hasNext()) { - user = dataFileReader.next(user); + try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file, datumReader)) { + GenericRecord user = null; + while (dataFileReader.hasNext()) { + user = dataFileReader.next(user); + } } file.delete(); http://git-wip-us.apache.org/repos/asf/storm/blob/83adfd9d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java index e8f0702..9386c9f 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -44,7 +45,7 @@ import org.junit.Assert; import org.junit.rules.ExpectedException; import org.mockito.Mock; -import org.mockito.MockitoAnnotations; + import static org.mockito.Mockito.*; import org.apache.hadoop.conf.Configuration; @@ -59,40 +60,50 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import java.io.File; import java.io.IOException; import java.util.HashMap; +import org.apache.storm.hdfs.testing.MiniDFSClusterRule; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.class) public class TestHdfsBolt { + @Rule + public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(new MiniDFSClusterRule.Java7Supplier<Configuration>() { + @Override + public Configuration get() { + Configuration conf = new Configuration(); + conf.set("fs.trash.interval", "10"); + conf.setBoolean("dfs.permissions", true); + File baseDir = new File("./target/hdfs/").getAbsoluteFile(); + FileUtil.fullyDelete(baseDir); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + return conf; + } + }); + private String hdfsURI; private DistributedFileSystem fs; - private MiniDFSCluster hdfsCluster; private static final String testRoot = "/unittest"; Tuple tuple1 = generateTestTuple(1, "First Tuple", "SFO", "CA"); Tuple tuple2 = generateTestTuple(1, "Second Tuple", "SJO", "CA"); - @Mock private OutputCollector collector; - @Mock private TopologyContext topologyContext; - @Rule public ExpectedException thrown = ExpectedException.none(); + @Mock + private OutputCollector collector; + @Mock + private TopologyContext topologyContext; + @Rule + public ExpectedException thrown = ExpectedException.none(); @Before public void setup() throws Exception { - MockitoAnnotations.initMocks(this); - Configuration conf = new Configuration(); - conf.set("fs.trash.interval", "10"); - conf.setBoolean("dfs.permissions", true); - File baseDir = new File("./target/hdfs/").getAbsoluteFile(); - FileUtil.fullyDelete(baseDir); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); - - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); - hdfsCluster = builder.build(); - fs = hdfsCluster.getFileSystem(); - hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/"; + fs = dfsClusterRule.getDfscluster().getFileSystem(); + hdfsURI = "hdfs://localhost:" + dfsClusterRule.getDfscluster().getNameNodePort() + "/"; } @After public void shutDown() throws IOException { fs.close(); - hdfsCluster.shutdown(); } @Test @@ -134,8 +145,7 @@ public class TestHdfsBolt { } @Test - public void testTwoTuplesOneFile() throws IOException - { + public void testTwoTuplesOneFile() throws IOException { HdfsBolt bolt = makeHdfsBolt(hdfsURI, 2, 10000f); bolt.prepare(new Config(), topologyContext, collector); bolt.execute(tuple1); @@ -150,8 +160,7 @@ public class TestHdfsBolt { } @Test - public void testFailedSync() throws IOException - { + public void testFailedSync() throws IOException { HdfsBolt bolt = makeHdfsBolt(hdfsURI, 2, 10000f); bolt.prepare(new Config(), topologyContext, collector); bolt.execute(tuple1); @@ -167,27 +176,23 @@ public class TestHdfsBolt { // One tuple and one rotation should yield one file with data // The failed executions should not cause rotations and any new files @Test - public void testFailureFilecount() throws IOException, InterruptedException - { + public void testFailureFilecount() throws IOException, InterruptedException { HdfsBolt bolt = makeHdfsBolt(hdfsURI, 1, .000001f); bolt.prepare(new Config(), topologyContext, collector); bolt.execute(tuple1); fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); - try - { + try { bolt.execute(tuple2); } catch (RuntimeException e) { // } - try - { + try { bolt.execute(tuple2); } catch (RuntimeException e) { // } - try - { + try { bolt.execute(tuple2); } catch (RuntimeException e) { // @@ -198,8 +203,7 @@ public class TestHdfsBolt { } @Test - public void testTickTuples() throws IOException - { + public void testTickTuples() throws IOException { HdfsBolt bolt = makeHdfsBolt(hdfsURI, 10, 10000f); bolt.prepare(new Config(), topologyContext, collector); @@ -226,36 +230,28 @@ public class TestHdfsBolt { SyncPolicy fieldsSyncPolicy = new CountSyncPolicy(countSync); FileRotationPolicy fieldsRotationPolicy = - new FileSizeRotationPolicy(rotationSizeMB, FileSizeRotationPolicy.Units.MB); + new FileSizeRotationPolicy(rotationSizeMB, FileSizeRotationPolicy.Units.MB); FileNameFormat fieldsFileNameFormat = new DefaultFileNameFormat().withPath(testRoot); return new HdfsBolt() - .withFsUrl(nameNodeAddr) - .withFileNameFormat(fieldsFileNameFormat) - .withRecordFormat(fieldsFormat) - .withRotationPolicy(fieldsRotationPolicy) - .withSyncPolicy(fieldsSyncPolicy); + .withFsUrl(nameNodeAddr) + .withFileNameFormat(fieldsFileNameFormat) + .withRecordFormat(fieldsFormat) + .withRotationPolicy(fieldsRotationPolicy) + .withSyncPolicy(fieldsSyncPolicy); } - private Tuple generateTestTuple(Object id, Object msg,Object city,Object state) { + private Tuple generateTestTuple(Object id, Object msg, Object city, Object state) { TopologyBuilder builder = new TopologyBuilder(); GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), - new Config(), new HashMap(), new HashMap(), new HashMap(), "") { + new Config(), new HashMap(), new HashMap(), new HashMap(), "") { @Override public Fields getComponentOutputFields(String componentId, String streamId) { - return new Fields("id", "msg","city","state"); + return new Fields("id", "msg", "city", "state"); } }; - return new TupleImpl(topologyContext, new Values(id, msg,city,state), 1, ""); - } - - private void printFiles(String path) throws IOException { - Path p = new Path(path); - FileStatus[] fileStatuses = fs.listStatus(p); - for (FileStatus file : fileStatuses) { - System.out.println("@@@ " + file.getPath() + " [" + file.getLen() + "]"); - } + return new TupleImpl(topologyContext, new Values(id, msg, city, state), 1, ""); } // Generally used to compare how files were actually written and compare to expectations based on total @@ -264,9 +260,11 @@ public class TestHdfsBolt { Path p = new Path(path); int nonZero = 0; - for (FileStatus file : fs.listStatus(p)) - if (file.getLen() > 0) + for (FileStatus file : fs.listStatus(p)) { + if (file.getLen() > 0) { nonZero++; + } + } return nonZero; } @@ -275,9 +273,11 @@ public class TestHdfsBolt { Path p = new Path(path); int zeroLength = 0; - for (FileStatus file : fs.listStatus(p)) - if (file.getLen() == 0) + for (FileStatus file : fs.listStatus(p)) { + if (file.getLen() == 0) { zeroLength++; + } + } return zeroLength; } http://git-wip-us.apache.org/repos/asf/storm/blob/83adfd9d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java index 9913d9d..7fee578 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -17,8 +18,10 @@ */ package org.apache.storm.hdfs.bolt; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; + import org.apache.storm.Config; -import org.apache.storm.Constants; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -39,13 +42,10 @@ import org.junit.*; import org.junit.rules.ExpectedException; import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import static org.mockito.Mockito.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -56,42 +56,51 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.HashMap; +import org.apache.storm.hdfs.testing.MiniDFSClusterRule; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.class) public class TestSequenceFileBolt { private static final Logger LOG = LoggerFactory.getLogger(TestSequenceFileBolt.class); + @Rule + public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(new MiniDFSClusterRule.Java7Supplier<Configuration>() { + @Override + public Configuration get() { + Configuration conf = new Configuration(); + conf.set("fs.trash.interval", "10"); + conf.setBoolean("dfs.permissions", true); + File baseDir = new File("./target/hdfs/").getAbsoluteFile(); + FileUtil.fullyDelete(baseDir); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + return conf; + } + }); + private String hdfsURI; private DistributedFileSystem fs; - private MiniDFSCluster hdfsCluster; private static final String testRoot = "/unittest"; Tuple tuple1 = generateTestTuple(1l, "first tuple"); Tuple tuple2 = generateTestTuple(2l, "second tuple"); - @Mock private OutputCollector collector; - @Mock private TopologyContext topologyContext; - @Rule public ExpectedException thrown = ExpectedException.none(); + @Mock + private OutputCollector collector; + @Mock + private TopologyContext topologyContext; + @Rule + public ExpectedException thrown = ExpectedException.none(); @Before public void setup() throws Exception { - MockitoAnnotations.initMocks(this); - Configuration conf = new Configuration(); - conf.set("fs.trash.interval", "10"); - conf.setBoolean("dfs.permissions", true); - File baseDir = new File("./target/hdfs/").getAbsoluteFile(); - FileUtil.fullyDelete(baseDir); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); - - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); - hdfsCluster = builder.build(); - fs = hdfsCluster.getFileSystem(); - hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/"; + fs = dfsClusterRule.getDfscluster().getFileSystem(); + hdfsURI = "hdfs://localhost:" + dfsClusterRule.getDfscluster().getNameNodePort() + "/"; } @After public void shutDown() throws IOException { fs.close(); - hdfsCluster.shutdown(); } @Test @@ -109,8 +118,7 @@ public class TestSequenceFileBolt { } @Test - public void testTwoTuplesOneFile() throws IOException - { + public void testTwoTuplesOneFile() throws IOException { SequenceFileBolt bolt = makeSeqBolt(hdfsURI, 2, 10000f); bolt.prepare(new Config(), topologyContext, collector); bolt.execute(tuple1); @@ -125,8 +133,7 @@ public class TestSequenceFileBolt { } @Test - public void testFailedSync() throws IOException - { + public void testFailedSync() throws IOException { SequenceFileBolt bolt = makeSeqBolt(hdfsURI, 2, 10000f); bolt.prepare(new Config(), topologyContext, collector); bolt.execute(tuple1); @@ -142,24 +149,24 @@ public class TestSequenceFileBolt { SyncPolicy fieldsSyncPolicy = new CountSyncPolicy(countSync); FileRotationPolicy fieldsRotationPolicy = - new FileSizeRotationPolicy(rotationSizeMB, FileSizeRotationPolicy.Units.MB); + new FileSizeRotationPolicy(rotationSizeMB, FileSizeRotationPolicy.Units.MB); FileNameFormat fieldsFileNameFormat = new DefaultFileNameFormat().withPath(testRoot); SequenceFormat seqFormat = new DefaultSequenceFormat("key", "value"); return new SequenceFileBolt() - .withFsUrl(nameNodeAddr) - .withFileNameFormat(fieldsFileNameFormat) - .withRotationPolicy(fieldsRotationPolicy) - .withSequenceFormat(seqFormat) - .withSyncPolicy(fieldsSyncPolicy); + .withFsUrl(nameNodeAddr) + .withFileNameFormat(fieldsFileNameFormat) + .withRotationPolicy(fieldsRotationPolicy) + .withSequenceFormat(seqFormat) + .withSyncPolicy(fieldsSyncPolicy); } private Tuple generateTestTuple(Long key, String value) { TopologyBuilder builder = new TopologyBuilder(); GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), - new Config(), new HashMap(), new HashMap(), new HashMap(), "") { + new Config(), new HashMap(), new HashMap(), new HashMap(), "") { @Override public Fields getComponentOutputFields(String componentId, String streamId) { return new Fields("key", "value");
