http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java 
b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 7db41b5..2db0240 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -74,10 +74,10 @@ import org.apache.storm.generated.RebalanceOptions;
 import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.SubmitOptions;
-import org.apache.storm.generated.SupervisorPageInfo;
 import org.apache.storm.generated.SupervisorAssignments;
-import org.apache.storm.generated.SupervisorWorkerHeartbeats;
+import org.apache.storm.generated.SupervisorPageInfo;
 import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.generated.SupervisorWorkerHeartbeats;
 import org.apache.storm.generated.TopologyHistoryInfo;
 import org.apache.storm.generated.TopologyInfo;
 import org.apache.storm.generated.TopologyPageInfo;
@@ -111,253 +111,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A stand alone storm cluster that runs inside a single process.
- * It is intended to be used for testing.  Both internal testing for
- * Apache Storm itself and for people building storm topologies.
- *<p>
- * LocalCluster is an AutoCloseable so if you are using it in tests you can use
- * a try block to be sure it is shut down.
+ * A stand alone storm cluster that runs inside a single process. It is 
intended to be used for testing.  Both internal testing for Apache
+ * Storm itself and for people building storm topologies.
+ * <p>
+ * LocalCluster is an AutoCloseable so if you are using it in tests you can 
use a try block to be sure it is shut down.
  * </p>
- * try (LocalCluster cluster = new LocalCluster()) {
- *     // Do some tests
- * }
- * // The cluster has been shut down.
+ * try (LocalCluster cluster = new LocalCluster()) { // Do some tests } // The 
cluster has been shut down.
  */
 public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
+    public static final KillOptions KILL_NOW = new KillOptions();
     private static final Logger LOG = 
LoggerFactory.getLogger(LocalCluster.class);
-    
-    private static ThriftServer startNimbusDaemon(Map<String, Object> conf, 
Nimbus nimbus) {
-        ThriftServer ret = new ThriftServer(conf, new Processor<>(nimbus), 
ThriftConnectionType.NIMBUS);
-        LOG.info("Starting Nimbus server...");
-        new Thread(() -> ret.serve()).start();
-        return ret;
-    }
-    
-    /**
-     * Simple way to configure a LocalCluster to meet your needs.
-     */
-    public static class Builder {
-        private int supervisors = 2;
-        private int portsPerSupervisor = 3;
-        private Map<String, Object> daemonConf = new HashMap<>();
-        private INimbus inimbus = null;
-        private IGroupMappingServiceProvider groupMapper = null;
-        private int supervisorSlotPortMin = 1024;
-        private boolean nimbusDaemon = false;
-        private UnaryOperator<Nimbus> nimbusWrapper = null;
-        private BlobStore store = null;
-        private TopoCache topoCache = null;
-        private IStormClusterState clusterState = null;
-        private ILeaderElector leaderElector = null;
-        private String trackId = null;
-        private boolean simulateTime = false;
-        
-        /**
-         * Set the number of supervisors the cluster should have.
-         */
-        public Builder withSupervisors(int supervisors) {
-            if (supervisors < 0) {
-                throw new IllegalArgumentException("supervisors cannot be 
negative");
-            }
-            this.supervisors = supervisors;
-            return this;
-        }
-        
-        /**
-         * Set the number of slots/ports each supervisor should have.
-         */
-        public Builder withPortsPerSupervisor(int portsPerSupervisor) {
-            if (portsPerSupervisor < 0) {
-                throw new IllegalArgumentException("supervisor ports cannot be 
negative");
-            }
-            this.portsPerSupervisor = portsPerSupervisor;
-            return this;
-        }
-        
-        /**
-         * Set the base config that the daemons should use.
-         */
-        public Builder withDaemonConf(Map<String, Object> conf) {
-            if (conf != null) {
-                this.daemonConf = new HashMap<>(conf);
-            }
-            return this;
-        }
-        
-        /**
-         * Add an single key/value config to the daemon conf.
-         */
-        public Builder withDaemonConf(String key, Object value) {
-            this.daemonConf.put(key, value);
-            return this;
-        }
-        
-        /**
-         * Override the INimbus instance that nimbus will use.
-         */
-        public Builder withINimbus(INimbus inimbus) {
-            this.inimbus = inimbus;
-            return this;
-        }
-        
-        /**
-         * Override the code that maps users to groups for authorization.
-         */
-        public Builder withGroupMapper(IGroupMappingServiceProvider 
groupMapper) {
-            this.groupMapper = groupMapper;
-            return this;
-        }
-        
-        /**
-         * When assigning ports to worker slots start at minPort.
-         */
-        public Builder withSupervisorSlotPortMin(Number minPort) {
-            int port = 1024;
-            if (minPort == null) {
-                LOG.warn("Number is null... {}", minPort);
-            } else {
-                port = minPort.intValue();
-            }
-            if (port <= 0) {
-                throw new IllegalArgumentException("port must be positive");
-            }
-            this.supervisorSlotPortMin = port;
-            return this;
-        }
-        
-        /**
-         * Have the local nimbus actually launch a thrift server.  This is 
intended to
-         * be used mostly for internal storm testing. 
-         */
-        public Builder withNimbusDaemon() {
-            return withNimbusDaemon(true);
-        }
-
-        /**
-         * If nimbusDaemon is true the local nimbus will launch a thrift 
server.  This is intended to
-         * be used mostly for internal storm testing. 
-         */
-        public Builder withNimbusDaemon(Boolean nimbusDaemon) {
-            if (nimbusDaemon == null) {
-                nimbusDaemon = false;
-                LOG.warn("nimbusDaemon is null");
-            }
-            this.nimbusDaemon = nimbusDaemon;
-            return this;
-        }
-        
-        /**
-         * Turn on simulated time in the cluster.  This allows someone to 
simulate long periods of
-         * time for timeouts etc when testing nimbus/supervisors themselves.  
NOTE: that this only
-         * works for code that uses the {@link org.apache.storm.utils.Time} 
class for time management
-         * so it will not work in all cases.
-         */
-        public Builder withSimulatedTime() {
-            return withSimulatedTime(true);
-        }
-
-        /**
-         * Turn on simulated time in the cluster.  This allows someone to 
simulate long periods of
-         * time for timeouts etc when testing nimbus/supervisors themselves.  
NOTE: that this only
-         * works for code that uses the {@link org.apache.storm.utils.Time} 
class for time management
-         * so it will not work in all cases.
-         */
-        public Builder withSimulatedTime(boolean simulateTime) {
-            this.simulateTime = simulateTime;
-            return this;
-        }
-        
-        /**
-         * Before nimbus is created/used call nimbusWrapper on it first and 
use the
-         * result instead.  This is intended for internal testing only, and it 
here to
-         * allow a mocking framework to spy on the nimbus class.
-         */
-        public Builder withNimbusWrapper(UnaryOperator<Nimbus> nimbusWrapper) {
-            this.nimbusWrapper = nimbusWrapper;
-            return this;
-        }
-        
-        /**
-         * Use the following blobstore instead of the one in the config.
-         * This is intended mostly for internal testing with Mocks.
-         */
-        public Builder withBlobStore(BlobStore store) {
-            this.store = store;
-            return this;
-        }
-
-        /**
-         * Use the following topo cache instead of creating out own.
-         * This is intended mostly for internal testing with Mocks.
-         */
-        public Builder withTopoCache(TopoCache topoCache) {
-            this.topoCache = topoCache;
-            return this;
-        }
 
-        /**
-         * Use the following clusterState instead of the one in the config.
-         * This is intended mostly for internal testing with Mocks.
-         */
-        public Builder withClusterState(IStormClusterState clusterState) {
-            this.clusterState = clusterState;
-            return this;
-        }
-        
-        /**
-         * Use the following leaderElector instead of the one in the config.
-         * This is intended mostly for internal testing with Mocks.
-         */
-        public Builder withLeaderElector(ILeaderElector leaderElector) {
-            this.leaderElector = leaderElector;
-            return this;
-        }
-        
-        /**
-         * A tracked cluster can run tracked topologies.
-         * See {@link org.apache.storm.testing.TrackedTopology} for more 
information
-         * on tracked topologies.
-         * @param trackId an arbitrary unique id that is used to keep track of 
tracked topologies 
-         */
-        public Builder withTracked(String trackId) {
-            this.trackId = trackId;
-            return this;
-        }
-        
-        /**
-         * A tracked cluster can run tracked topologies.
-         * See {@link org.apache.storm.testing.TrackedTopology} for more 
information
-         * on tracked topologies.
-         */
-        public Builder withTracked() {
-            this.trackId = Utils.uuid();
-            return this;
-        }
-        
-        /**
-         * Builds a new LocalCluster.
-         * @return the LocalCluster
-         * @throws Exception on any one of many different errors.
-         * This is intended for testing so yes it is ugly and throws 
Exception...
-         */
-        public LocalCluster build() throws Exception {
-            return new LocalCluster(this);
-        }
+    static {
+        KILL_NOW.set_wait_secs(0);
     }
-    
-    private static class TrackedStormCommon extends StormCommon {
 
-        private final String id;
-        public TrackedStormCommon(String id) {
-            this.id = id;
-        }
-        
-        @Override
-        public IBolt makeAckerBoltImpl() {
-            return new NonRichBoltTracker(new Acker(), id);
-        }
-    }
-    
     private final Nimbus nimbus;
     //This is very private and does not need to be exposed
     private final AtomicInteger portCounter;
@@ -373,27 +141,28 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
     private final StormCommonInstaller commonInstaller;
     private final SimulatedTime time;
     private final NimbusClient.LocalOverride nimbusOverride;
-    
     /**
      * Create a default LocalCluster.
+     *
      * @throws Exception on any error
      */
     public LocalCluster() throws Exception {
         this(new 
Builder().withDaemonConf(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true));
     }
-    
+
     /**
      * Create a LocalCluster that connects to an existing Zookeeper instance.
+     *
      * @param zkHost the host for ZK
      * @param zkPort the port for ZK
      * @throws Exception on any error
      */
     public LocalCluster(String zkHost, Long zkPort) throws Exception {
         this(new 
Builder().withDaemonConf(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true)
-                .withDaemonConf(Config.STORM_ZOOKEEPER_SERVERS, 
Arrays.asList(zkHost))
-                .withDaemonConf(Config.STORM_ZOOKEEPER_PORT, zkPort));
+                          .withDaemonConf(Config.STORM_ZOOKEEPER_SERVERS, 
Arrays.asList(zkHost))
+                          .withDaemonConf(Config.STORM_ZOOKEEPER_PORT, 
zkPort));
     }
-    
+
     @SuppressWarnings("deprecation")
     private LocalCluster(Builder builder) throws Exception {
         if (builder.simulateTime) {
@@ -416,7 +185,7 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
             } else {
                 this.commonInstaller = null;
             }
-        
+
             this.tmpDirs = new ArrayList<>();
             this.supervisors = new ArrayList<>();
             TmpPath nimbusTmp = new TmpPath();
@@ -428,7 +197,7 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
             conf.put(Config.STORM_CLUSTER_MODE, "local");
             conf.put(Config.BLOBSTORE_SUPERUSER, 
System.getProperty("user.name"));
             conf.put(Config.BLOBSTORE_DIR, nimbusTmp.getPath());
-        
+
             InProcessZookeeper zookeeper = null;
             if 
(!builder.daemonConf.containsKey(Config.STORM_ZOOKEEPER_SERVERS)) {
                 zookeeper = new InProcessZookeeper();
@@ -438,7 +207,7 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
             this.zookeeper = zookeeper;
             conf.putAll(builder.daemonConf);
             this.daemonConf = new HashMap<>(conf);
-        
+
             this.portCounter = new 
AtomicInteger(builder.supervisorSlotPortMin);
             ClusterStateContext cs = new 
ClusterStateContext(DaemonType.NIMBUS, daemonConf);
             this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, 
cs);
@@ -450,8 +219,8 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
             //Set it for nimbus only
             conf.put(Config.STORM_LOCAL_DIR, nimbusTmp.getPath());
             Nimbus nimbus = new Nimbus(conf, builder.inimbus == null ? new 
StandaloneINimbus() : builder.inimbus,
-                this.getClusterState(), null, builder.store, 
builder.topoCache, builder.leaderElector,
-                builder.groupMapper);
+                                       this.getClusterState(), null, 
builder.store, builder.topoCache, builder.leaderElector,
+                                       builder.groupMapper);
             if (builder.nimbusWrapper != null) {
                 nimbus = builder.nimbusWrapper.apply(nimbus);
             }
@@ -464,11 +233,11 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
             }
             this.sharedContext = context;
             this.thriftServer = builder.nimbusDaemon ? 
startNimbusDaemon(this.daemonConf, this.nimbus) : null;
-        
+
             for (int i = 0; i < builder.supervisors; i++) {
                 addSupervisor(builder.portsPerSupervisor, null, null);
             }
-        
+
             //Wait for a leader to be elected (or topology submission can be 
rejected)
             try {
                 long timeoutAfter = System.currentTimeMillis() + 10_000;
@@ -479,7 +248,7 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
                     Thread.sleep(1);
                 }
             } catch (Exception e) {
-                //Ignore any exceptions we might be doing a test for 
authentication 
+                //Ignore any exceptions we might be doing a test for 
authentication
             }
             if (thriftServer == null) {
                 //We don't want to override the client if there is a thrift 
server up and running, or we would not test any
@@ -496,16 +265,88 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
         }
     }
 
+    private static ThriftServer startNimbusDaemon(Map<String, Object> conf, 
Nimbus nimbus) {
+        ThriftServer ret = new ThriftServer(conf, new Processor<>(nimbus), 
ThriftConnectionType.NIMBUS);
+        LOG.info("Starting Nimbus server...");
+        new Thread(() -> ret.serve()).start();
+        return ret;
+    }
+
+    private static boolean areAllWorkersWaiting() {
+        boolean ret = true;
+        for (Shutdownable s : ProcessSimulator.getAllProcessHandles()) {
+            if (s instanceof DaemonCommon) {
+                ret = ret && ((DaemonCommon) s).isWaiting();
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Run c with a local mode cluster overriding the NimbusClient and 
DRPCClient calls. NOTE local mode override happens by default now
+     * unless netty is turned on for the local cluster.
+     *
+     * @param c      the callable to run in this mode
+     * @param ttlSec the number of seconds to let the cluster run after c has 
completed
+     * @return the result of calling C
+     *
+     * @throws Exception on any Exception.
+     */
+    public static <T> T withLocalModeOverride(Callable<T> c, long ttlSec) 
throws Exception {
+        LOG.info("\n\n\t\tSTARTING LOCAL MODE CLUSTER\n\n");
+        try (LocalCluster local = new LocalCluster();
+             LocalDRPC drpc = new LocalDRPC();
+             DRPCClient.LocalOverride drpcOverride = new 
DRPCClient.LocalOverride(drpc)) {
+
+            T ret = c.call();
+            LOG.info("\n\n\t\tRUNNING LOCAL CLUSTER for {} seconds.\n\n", 
ttlSec);
+            Thread.sleep(ttlSec * 1000);
+
+            LOG.info("\n\n\t\tSTOPPING LOCAL MODE CLUSTER\n\n");
+            return ret;
+        }
+    }
+
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 1) {
+            throw new IllegalArgumentException("No class was specified to 
run");
+        }
+
+        long ttl = 20;
+        String ttlString = System.getProperty("storm.local.sleeptime", "20");
+        try {
+            ttl = Long.valueOf(ttlString);
+        } catch (NumberFormatException e) {
+            LOG.warn("could not parse the sleep time defaulting to {} 
seconds", ttl);
+        }
+
+        withLocalModeOverride(() -> {
+            String klass = args[0];
+            String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
+            Class<?> c = Class.forName(klass);
+            Method main = c.getDeclaredMethod("main", String[].class);
+
+            LOG.info("\n\n\t\tRUNNING {} with args {}\n\n", main, 
Arrays.toString(newArgs));
+            main.invoke(null, (Object) newArgs);
+            return (Void) null;
+        }, ttl);
+
+        //Sometimes external things used with testing don't shut down all the 
way
+        System.exit(0);
+    }
+
     /**
      * Checks if Nimbuses have elected a leader.
+     *
      * @return boolean
+     *
      * @throws AuthorizationException
      * @throws TException
      */
     private boolean hasLeader() throws AuthorizationException, TException {
         ClusterSummary summary = getNimbus().getClusterInfo();
         if (summary.is_set_nimbuses()) {
-            for (NimbusSummary sum: summary.get_nimbuses()) {
+            for (NimbusSummary sum : summary.get_nimbuses()) {
                 if (sum.is_isLeader()) {
                     return true;
                 }
@@ -520,54 +361,22 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
     public Nimbus getNimbus() {
         return nimbus;
     }
-    
+
     /**
      * @return the base config for the daemons.
      */
     public Map<String, Object> getDaemonConf() {
         return new HashMap<>(daemonConf);
     }
-    
-    public static final KillOptions KILL_NOW = new KillOptions();
-
-    static {
-        KILL_NOW.set_wait_secs(0);
-    }
-    
-    /**
-     * When running a topology locally, for tests etc.  It is helpful to be 
sure
-     * that the topology is dead before the test exits.  This is an 
AutoCloseable
-     * topology that not only gives you access to the compiled StormTopology
-     * but also will kill the topology when it closes.
-     * 
-     * try (LocalTopology testTopo = cluster.submitTopology("testing", ...)) {
-     *   // Run Some test
-     * }
-     * // The topology has been killed
-     */
-    public class LocalTopology extends StormTopology implements ILocalTopology 
{
-        private static final long serialVersionUID = 6145919776650637748L;
-        private final String topoName;
-        
-        public LocalTopology(String topoName, StormTopology topo) {
-            super(topo);
-            this.topoName = topoName;
-        }
 
-        @Override
-        public void close() throws TException {
-            killTopologyWithOpts(topoName, KILL_NOW);
-        }
-    }
-    
     @Override
     public LocalTopology submitTopology(String topologyName, Map<String, 
Object> conf, StormTopology topology)
-            throws TException {
+        throws TException {
         if (!Utils.isValidConf(conf)) {
             throw new IllegalArgumentException("Topology conf is not 
json-serializable");
         }
         getNimbus().submitTopology(topologyName, null, 
JSONValue.toJSONString(conf), Utils.addVersions(topology));
-        
+
         ISubmitterHook hook = (ISubmitterHook) Utils.getConfiguredClass(conf, 
Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN);
         if (hook != null) {
             TopologyInfo topologyInfo = Utils.getTopologyInfo(topologyName, 
null, conf);
@@ -581,27 +390,29 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
     }
 
     @Override
-    public LocalTopology submitTopologyWithOpts(String topologyName, 
Map<String, Object> conf, StormTopology topology, SubmitOptions submitOpts)
-            throws TException {
+    public LocalTopology submitTopologyWithOpts(String topologyName, 
Map<String, Object> conf, StormTopology topology,
+                                                SubmitOptions submitOpts)
+        throws TException {
         if (!Utils.isValidConf(conf)) {
             throw new IllegalArgumentException("Topology conf is not 
json-serializable");
         }
-        getNimbus().submitTopologyWithOpts(topologyName, null, 
JSONValue.toJSONString(conf),  Utils.addVersions(topology), submitOpts);
+        getNimbus().submitTopologyWithOpts(topologyName, null, 
JSONValue.toJSONString(conf), Utils.addVersions(topology), submitOpts);
         return new LocalTopology(topologyName, topology);
     }
 
     @Override
     public LocalTopology submitTopology(String topologyName, Map<String, 
Object> conf, TrackedTopology topology)
-            throws TException {
+        throws TException {
         return submitTopology(topologyName, conf, topology.getTopology());
     }
 
     @Override
-    public LocalTopology submitTopologyWithOpts(String topologyName, 
Map<String, Object> conf, TrackedTopology topology, SubmitOptions submitOpts)
-            throws TException {
-            return submitTopologyWithOpts(topologyName, conf, 
topology.getTopology(), submitOpts);
+    public LocalTopology submitTopologyWithOpts(String topologyName, 
Map<String, Object> conf, TrackedTopology topology,
+                                                SubmitOptions submitOpts)
+        throws TException {
+        return submitTopologyWithOpts(topologyName, conf, 
topology.getTopology(), submitOpts);
     }
-    
+
     @Override
     public void uploadNewCredentials(String topologyName, Credentials creds) 
throws TException {
         getNimbus().uploadNewCredentials(topologyName, creds);
@@ -690,7 +501,7 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
         if (getClusterState() != null) {
             getClusterState().disconnect();
         }
-        for (Supervisor s: supervisors) {
+        for (Supervisor s : supervisors) {
             s.shutdownAllWorkers(null, ReadClusterState.THREAD_DUMP_ON_ERROR);
             s.close();
         }
@@ -700,45 +511,47 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
             zookeeper.close();
             LOG.info("Done shutting down in process zookeeper");
         }
-        
-        for (TmpPath p: tmpDirs) {
+
+        for (TmpPath p : tmpDirs) {
             p.close();
         }
-        
+
         if (this.trackId != null) {
             LOG.warn("Clearing tracked metrics for ID {}", this.trackId);
             LocalExecutor.clearTrackId();
             RegisteredGlobalState.clearState(this.trackId);
         }
-        
+
         if (this.commonInstaller != null) {
             this.commonInstaller.close();
         }
-        
+
         if (time != null) {
             time.close();
         }
     }
-    
+
     /**
      * Get a specific Supervisor.  This is intended mostly for internal 
testing.
+     *
      * @param id the id of the supervisor
      */
     public synchronized Supervisor getSupervisor(String id) {
-        for (Supervisor s: supervisors) {
+        for (Supervisor s : supervisors) {
             if (id.equals(s.getId())) {
                 return s;
             }
         }
         return null;
     }
-    
+
     /**
      * Kill a specific supervisor.  This is intended mostly for internal 
testing.
+     *
      * @param id the id of the supervisor
      */
     public synchronized void killSupervisor(String id) {
-        for (Iterator<Supervisor> it = supervisors.iterator(); it.hasNext();) {
+        for (Iterator<Supervisor> it = supervisors.iterator(); it.hasNext(); ) 
{
             Supervisor s = it.next();
             if (id.equals(s.getId())) {
                 it.remove();
@@ -748,7 +561,7 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
             }
         }
     }
-    
+
     /**
      * Add another supervisor to the topology.  This is intended mostly for 
internal testing.
      */
@@ -758,26 +571,29 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
 
     /**
      * Add another supervisor to the topology.  This is intended mostly for 
internal testing.
+     *
      * @param ports the number of ports/slots the supervisor should have
      */
     public Supervisor addSupervisor(Number ports) throws Exception {
         return addSupervisor(ports, null, null);
     }
-    
+
     /**
      * Add another supervisor to the topology.  This is intended mostly for 
internal testing.
+     *
      * @param ports the number of ports/slots the supervisor should have
-     * @param id the id of the new supervisor, so you can find it later.
+     * @param id    the id of the new supervisor, so you can find it later.
      */
     public Supervisor addSupervisor(Number ports, String id) throws Exception {
         return addSupervisor(ports, null, id);
     }
-    
+
     /**
      * Add another supervisor to the topology.  This is intended mostly for 
internal testing.
+     *
      * @param ports the number of ports/slots the supervisor should have
-     * @param conf any config values that should be added/over written in the 
daemon conf of the cluster.
-     * @param id the id of the new supervisor, so you can find it later.
+     * @param conf  any config values that should be added/over written in the 
daemon conf of the cluster.
+     * @param id    the id of the new supervisor, so you can find it later.
      */
     public synchronized Supervisor addSupervisor(Number ports, Map<String, 
Object> conf, String id) throws Exception {
         if (ports == null) {
@@ -785,19 +601,19 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
         }
         TmpPath tmpDir = new TmpPath();
         tmpDirs.add(tmpDir);
-        
+
         List<Integer> portNumbers = new ArrayList<>(ports.intValue());
         for (int i = 0; i < ports.intValue(); i++) {
             portNumbers.add(portCounter.getAndIncrement());
         }
-        
+
         Map<String, Object> superConf = new HashMap<>(daemonConf);
         if (conf != null) {
             superConf.putAll(conf);
         }
         superConf.put(Config.STORM_LOCAL_DIR, tmpDir.getPath());
         superConf.put(DaemonConfig.SUPERVISOR_SLOTS_PORTS, portNumbers);
-        
+
         final String superId = id == null ? Utils.uuid() : id;
         ISupervisor isuper = new StandaloneSupervisor() {
             @Override
@@ -808,7 +624,7 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
         if (!ConfigUtils.isLocalMode(superConf)) {
             throw new IllegalArgumentException("Cannot start server in 
distrubuted mode!");
         }
-        
+
         Supervisor s = new Supervisor(superConf, sharedContext, isuper);
         s.launch();
         s.setLocalNimbus(this.nimbus);
@@ -816,51 +632,39 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
         supervisors.add(s);
         return s;
     }
-    
+
     private boolean areAllSupervisorsWaiting() {
         boolean ret = true;
-        for (Supervisor s: supervisors) {
+        for (Supervisor s : supervisors) {
             ret = ret && s.isWaiting();
         }
         return ret;
     }
-    
-    private static boolean areAllWorkersWaiting() {
-        boolean ret = true;
-        for (Shutdownable s: ProcessSimulator.getAllProcessHandles()) {
-            if (s instanceof DaemonCommon) {
-                ret = ret && ((DaemonCommon)s).isWaiting();
-            }
-        }
-        return ret;
-    }
-    
+
     /**
-     * Wait for the cluster to be idle.  This is intended to be used with
-     * Simulated time and is for internal testing.
+     * Wait for the cluster to be idle.  This is intended to be used with 
Simulated time and is for internal testing.
+     *
      * @throws InterruptedException if interrupted while waiting.
-     * @throws AssertionError if the cluster did not come to an idle point with
-     * a timeout.
+     * @throws AssertionError       if the cluster did not come to an idle 
point with a timeout.
      */
     public void waitForIdle() throws InterruptedException {
         waitForIdle(Testing.TEST_TIMEOUT_MS);
     }
-    
+
     /**
-     * Wait for the cluster to be idle.  This is intended to be used with
-     * Simulated time and is for internal testing.
+     * Wait for the cluster to be idle.  This is intended to be used with 
Simulated time and is for internal testing.
+     *
      * @param timeoutMs the number of ms to wait before throwing an error.
      * @throws InterruptedException if interrupted while waiting.
-     * @throws AssertionError if the cluster did not come to an idle point with
-     * a timeout.
+     * @throws AssertionError       if the cluster did not come to an idle 
point with a timeout.
      */
     public void waitForIdle(long timeoutMs) throws InterruptedException {
         Random rand = ThreadLocalRandom.current();
         //wait until all workers, supervisors, and nimbus is waiting
         final long endTime = System.currentTimeMillis() + timeoutMs;
         while (!(nimbus.isWaiting() &&
-                areAllSupervisorsWaiting() &&
-                areAllWorkersWaiting())) {
+                 areAllSupervisorsWaiting() &&
+                 areAllWorkersWaiting())) {
             if (System.currentTimeMillis() >= endTime) {
                 LOG.info("Cluster was not idle in {} ms", timeoutMs);
                 LOG.info(Utils.threadDump());
@@ -869,12 +673,12 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
             Thread.sleep(rand.nextInt(20));
         }
     }
-    
+
     @Override
     public void advanceClusterTime(int secs) throws InterruptedException {
         advanceClusterTime(secs, 1);
     }
-    
+
     @Override
     public void advanceClusterTime(int secs, int incSecs) throws 
InterruptedException {
         for (int amountLeft = secs; amountLeft > 0; amountLeft -= incSecs) {
@@ -888,17 +692,15 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
     public IStormClusterState getClusterState() {
         return clusterState;
     }
-    
+
     @Override
     public String getTrackedId() {
         return trackId;
     }
 
-    //Nimbus Compatibility
-    
     @Override
     public void submitTopology(String name, String uploadedJarLocation, String 
jsonConf, StormTopology topology)
-            throws AlreadyAliveException, InvalidTopologyException, 
AuthorizationException, TException {
+        throws AlreadyAliveException, InvalidTopologyException, 
AuthorizationException, TException {
         try {
             @SuppressWarnings("unchecked")
             Map<String, Object> conf = (Map<String, Object>) 
JSONValue.parseWithException(jsonConf);
@@ -908,10 +710,12 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
         }
     }
 
+    //Nimbus Compatibility
+
     @Override
     public void submitTopologyWithOpts(String name, String 
uploadedJarLocation, String jsonConf, StormTopology topology,
-            SubmitOptions options)
-            throws AlreadyAliveException, InvalidTopologyException, 
AuthorizationException, TException {
+                                       SubmitOptions options)
+        throws AlreadyAliveException, InvalidTopologyException, 
AuthorizationException, TException {
         try {
             @SuppressWarnings("unchecked")
             Map<String, Object> conf = (Map<String, Object>) 
JSONValue.parseWithException(jsonConf);
@@ -935,7 +739,7 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
 
     @Override
     public void debug(String name, String component, boolean enable, double 
samplingPercentage)
-            throws NotAliveException, AuthorizationException, TException {
+        throws NotAliveException, AuthorizationException, TException {
         // TODO Auto-generated method stub
         throw new RuntimeException("NOT IMPLEMENTED YET");
     }
@@ -948,14 +752,14 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
 
     @Override
     public List<ProfileRequest> getComponentPendingProfileActions(String id, 
String componentId, ProfileAction action)
-            throws TException {
+        throws TException {
         // TODO Auto-generated method stub
         throw new RuntimeException("NOT IMPLEMENTED YET");
     }
 
     @Override
     public String beginCreateBlob(String key, SettableBlobMeta meta)
-            throws AuthorizationException, KeyAlreadyExistsException, 
TException {
+        throws AuthorizationException, KeyAlreadyExistsException, TException {
         throw new RuntimeException("BLOBS NOT SUPPORTED IN LOCAL MODE");
     }
 
@@ -986,13 +790,13 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
 
     @Override
     public void setBlobMeta(String key, SettableBlobMeta meta)
-            throws AuthorizationException, KeyNotFoundException, TException {
+        throws AuthorizationException, KeyNotFoundException, TException {
         throw new KeyNotFoundException("BLOBS NOT SUPPORTED IN LOCAL MODE");
     }
 
     @Override
     public BeginDownloadResult beginBlobDownload(String key)
-            throws AuthorizationException, KeyNotFoundException, TException {
+        throws AuthorizationException, KeyNotFoundException, TException {
         throw new KeyNotFoundException("BLOBS NOT SUPPORTED IN LOCAL MODE");
     }
 
@@ -1021,7 +825,7 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
 
     @Override
     public int updateBlobReplication(String key, int replication)
-            throws AuthorizationException, KeyNotFoundException, TException {
+        throws AuthorizationException, KeyNotFoundException, TException {
         throw new KeyNotFoundException("BLOBS NOT SUPPORTED IN LOCAL MODE");
     }
 
@@ -1075,28 +879,28 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
 
     @Override
     public TopologyInfo getTopologyInfoWithOpts(String id, GetInfoOptions 
options)
-            throws NotAliveException, AuthorizationException, TException {
+        throws NotAliveException, AuthorizationException, TException {
         // TODO Auto-generated method stub
         throw new RuntimeException("NOT IMPLEMENTED YET");
     }
 
     @Override
     public TopologyPageInfo getTopologyPageInfo(String id, String window, 
boolean is_include_sys)
-            throws NotAliveException, AuthorizationException, TException {
+        throws NotAliveException, AuthorizationException, TException {
         // TODO Auto-generated method stub
         throw new RuntimeException("NOT IMPLEMENTED YET");
     }
 
     @Override
     public SupervisorPageInfo getSupervisorPageInfo(String id, String host, 
boolean is_include_sys)
-            throws NotAliveException, AuthorizationException, TException {
+        throws NotAliveException, AuthorizationException, TException {
         // TODO Auto-generated method stub
         throw new RuntimeException("NOT IMPLEMENTED YET");
     }
 
     @Override
     public ComponentPageInfo getComponentPageInfo(String topology_id, String 
component_id, String window,
-            boolean is_include_sys) throws NotAliveException, 
AuthorizationException, TException {
+                                                  boolean is_include_sys) 
throws NotAliveException, AuthorizationException, TException {
         // TODO Auto-generated method stub
         throw new RuntimeException("NOT IMPLEMENTED YET");
     }
@@ -1112,29 +916,6 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
         // TODO Auto-generated method stub
         throw new RuntimeException("NOT IMPLEMENTED YET");
     }
-    
-    /**
-     * Run c with a local mode cluster overriding the NimbusClient and 
DRPCClient calls.
-     * NOTE local mode override happens by default now unless netty is turned 
on for the local cluster.
-     * @param c the callable to run in this mode
-     * @param ttlSec the number of seconds to let the cluster run after c has 
completed
-     * @return the result of calling C
-     * @throws Exception on any Exception.
-     */
-    public static <T> T withLocalModeOverride(Callable<T> c, long ttlSec) 
throws Exception {
-        LOG.info("\n\n\t\tSTARTING LOCAL MODE CLUSTER\n\n");
-        try (LocalCluster local = new LocalCluster();
-                LocalDRPC drpc = new LocalDRPC();
-                DRPCClient.LocalOverride drpcOverride = new 
DRPCClient.LocalOverride(drpc)) {
-
-            T ret = c.call();
-            LOG.info("\n\n\t\tRUNNING LOCAL CLUSTER for {} seconds.\n\n", 
ttlSec);
-            Thread.sleep(ttlSec * 1000);
-            
-            LOG.info("\n\n\t\tSTOPPING LOCAL MODE CLUSTER\n\n");
-            return ret;
-        }
-    }
 
     @Override
     public List<OwnerResourceSummary> getOwnerResourceSummaries(String owner) 
throws AuthorizationException, TException {
@@ -1161,31 +942,243 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
         getNimbus().processWorkerMetrics(metrics);
     }
 
-    public static void main(final String [] args) throws Exception {
-        if (args.length < 1) {
-            throw new IllegalArgumentException("No class was specified to 
run");
+    /**
+     * Simple way to configure a LocalCluster to meet your needs.
+     */
+    public static class Builder {
+        private int supervisors = 2;
+        private int portsPerSupervisor = 3;
+        private Map<String, Object> daemonConf = new HashMap<>();
+        private INimbus inimbus = null;
+        private IGroupMappingServiceProvider groupMapper = null;
+        private int supervisorSlotPortMin = 1024;
+        private boolean nimbusDaemon = false;
+        private UnaryOperator<Nimbus> nimbusWrapper = null;
+        private BlobStore store = null;
+        private TopoCache topoCache = null;
+        private IStormClusterState clusterState = null;
+        private ILeaderElector leaderElector = null;
+        private String trackId = null;
+        private boolean simulateTime = false;
+
+        /**
+         * Set the number of supervisors the cluster should have.
+         */
+        public Builder withSupervisors(int supervisors) {
+            if (supervisors < 0) {
+                throw new IllegalArgumentException("supervisors cannot be 
negative");
+            }
+            this.supervisors = supervisors;
+            return this;
         }
-        
-        long ttl = 20;
-        String ttlString = System.getProperty("storm.local.sleeptime", "20");
-        try {
-            ttl = Long.valueOf(ttlString);
-        } catch (NumberFormatException e) {
-            LOG.warn("could not parse the sleep time defaulting to {} 
seconds", ttl);
+
+        /**
+         * Set the number of slots/ports each supervisor should have.
+         */
+        public Builder withPortsPerSupervisor(int portsPerSupervisor) {
+            if (portsPerSupervisor < 0) {
+                throw new IllegalArgumentException("supervisor ports cannot be 
negative");
+            }
+            this.portsPerSupervisor = portsPerSupervisor;
+            return this;
+        }
+
+        /**
+         * Set the base config that the daemons should use.
+         */
+        public Builder withDaemonConf(Map<String, Object> conf) {
+            if (conf != null) {
+                this.daemonConf = new HashMap<>(conf);
+            }
+            return this;
+        }
+
+        /**
+         * Add an single key/value config to the daemon conf.
+         */
+        public Builder withDaemonConf(String key, Object value) {
+            this.daemonConf.put(key, value);
+            return this;
+        }
+
+        /**
+         * Override the INimbus instance that nimbus will use.
+         */
+        public Builder withINimbus(INimbus inimbus) {
+            this.inimbus = inimbus;
+            return this;
+        }
+
+        /**
+         * Override the code that maps users to groups for authorization.
+         */
+        public Builder withGroupMapper(IGroupMappingServiceProvider 
groupMapper) {
+            this.groupMapper = groupMapper;
+            return this;
+        }
+
+        /**
+         * When assigning ports to worker slots start at minPort.
+         */
+        public Builder withSupervisorSlotPortMin(Number minPort) {
+            int port = 1024;
+            if (minPort == null) {
+                LOG.warn("Number is null... {}", minPort);
+            } else {
+                port = minPort.intValue();
+            }
+            if (port <= 0) {
+                throw new IllegalArgumentException("port must be positive");
+            }
+            this.supervisorSlotPortMin = port;
+            return this;
+        }
+
+        /**
+         * Have the local nimbus actually launch a thrift server.  This is 
intended to be used mostly for internal storm testing.
+         */
+        public Builder withNimbusDaemon() {
+            return withNimbusDaemon(true);
+        }
+
+        /**
+         * If nimbusDaemon is true the local nimbus will launch a thrift 
server.  This is intended to be used mostly for internal storm
+         * testing.
+         */
+        public Builder withNimbusDaemon(Boolean nimbusDaemon) {
+            if (nimbusDaemon == null) {
+                nimbusDaemon = false;
+                LOG.warn("nimbusDaemon is null");
+            }
+            this.nimbusDaemon = nimbusDaemon;
+            return this;
+        }
+
+        /**
+         * Turn on simulated time in the cluster.  This allows someone to 
simulate long periods of time for timeouts etc when testing
+         * nimbus/supervisors themselves.  NOTE: that this only works for code 
that uses the {@link org.apache.storm.utils.Time} class for
+         * time management so it will not work in all cases.
+         */
+        public Builder withSimulatedTime() {
+            return withSimulatedTime(true);
+        }
+
+        /**
+         * Turn on simulated time in the cluster.  This allows someone to 
simulate long periods of time for timeouts etc when testing
+         * nimbus/supervisors themselves.  NOTE: that this only works for code 
that uses the {@link org.apache.storm.utils.Time} class for
+         * time management so it will not work in all cases.
+         */
+        public Builder withSimulatedTime(boolean simulateTime) {
+            this.simulateTime = simulateTime;
+            return this;
+        }
+
+        /**
+         * Before nimbus is created/used call nimbusWrapper on it first and 
use the result instead.  This is intended for internal testing
+         * only, and it here to allow a mocking framework to spy on the nimbus 
class.
+         */
+        public Builder withNimbusWrapper(UnaryOperator<Nimbus> nimbusWrapper) {
+            this.nimbusWrapper = nimbusWrapper;
+            return this;
+        }
+
+        /**
+         * Use the following blobstore instead of the one in the config. This 
is intended mostly for internal testing with Mocks.
+         */
+        public Builder withBlobStore(BlobStore store) {
+            this.store = store;
+            return this;
+        }
+
+        /**
+         * Use the following topo cache instead of creating out own. This is 
intended mostly for internal testing with Mocks.
+         */
+        public Builder withTopoCache(TopoCache topoCache) {
+            this.topoCache = topoCache;
+            return this;
+        }
+
+        /**
+         * Use the following clusterState instead of the one in the config. 
This is intended mostly for internal testing with Mocks.
+         */
+        public Builder withClusterState(IStormClusterState clusterState) {
+            this.clusterState = clusterState;
+            return this;
+        }
+
+        /**
+         * Use the following leaderElector instead of the one in the config. 
This is intended mostly for internal testing with Mocks.
+         */
+        public Builder withLeaderElector(ILeaderElector leaderElector) {
+            this.leaderElector = leaderElector;
+            return this;
+        }
+
+        /**
+         * A tracked cluster can run tracked topologies. See {@link 
org.apache.storm.testing.TrackedTopology} for more information on
+         * tracked topologies.
+         *
+         * @param trackId an arbitrary unique id that is used to keep track of 
tracked topologies
+         */
+        public Builder withTracked(String trackId) {
+            this.trackId = trackId;
+            return this;
+        }
+
+        /**
+         * A tracked cluster can run tracked topologies. See {@link 
org.apache.storm.testing.TrackedTopology} for more information on
+         * tracked topologies.
+         */
+        public Builder withTracked() {
+            this.trackId = Utils.uuid();
+            return this;
+        }
+
+        /**
+         * Builds a new LocalCluster.
+         *
+         * @return the LocalCluster
+         *
+         * @throws Exception on any one of many different errors. This is 
intended for testing so yes it is ugly and throws Exception...
+         */
+        public LocalCluster build() throws Exception {
+            return new LocalCluster(this);
+        }
+    }
+
+    private static class TrackedStormCommon extends StormCommon {
+
+        private final String id;
+
+        public TrackedStormCommon(String id) {
+            this.id = id;
+        }
+
+        @Override
+        public IBolt makeAckerBoltImpl() {
+            return new NonRichBoltTracker(new Acker(), id);
+        }
+    }
+
+    /**
+     * When running a topology locally, for tests etc.  It is helpful to be 
sure that the topology is dead before the test exits.  This is
+     * an AutoCloseable topology that not only gives you access to the 
compiled StormTopology but also will kill the topology when it
+     * closes.
+     *
+     * try (LocalTopology testTopo = cluster.submitTopology("testing", ...)) { 
// Run Some test } // The topology has been killed
+     */
+    public class LocalTopology extends StormTopology implements ILocalTopology 
{
+        private static final long serialVersionUID = 6145919776650637748L;
+        private final String topoName;
+
+        public LocalTopology(String topoName, StormTopology topo) {
+            super(topo);
+            this.topoName = topoName;
+        }
+
+        @Override
+        public void close() throws TException {
+            killTopologyWithOpts(topoName, KILL_NOW);
         }
-        
-        withLocalModeOverride(() -> {
-            String klass = args[0];
-            String [] newArgs = Arrays.copyOfRange(args, 1, args.length); 
-            Class<?> c = Class.forName(klass);
-            Method main = c.getDeclaredMethod("main", String[].class);
-            
-            LOG.info("\n\n\t\tRUNNING {} with args {}\n\n", main, 
Arrays.toString(newArgs));
-            main.invoke(null, (Object)newArgs);
-            return (Void)null;
-        }, ttl);
-        
-        //Sometimes external things used with testing don't shut down all the 
way
-        System.exit(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/LocalDRPC.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalDRPC.java 
b/storm-server/src/main/java/org/apache/storm/LocalDRPC.java
index bd60b62..ef257ed 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalDRPC.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalDRPC.java
@@ -15,10 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm;
 
 import java.util.Map;
-
 import org.apache.storm.daemon.drpc.DRPC;
 import org.apache.storm.daemon.drpc.DRPCThrift;
 import org.apache.storm.generated.AuthorizationException;
@@ -30,10 +30,8 @@ import org.apache.thrift.TException;
 
 /**
  * A Local way to test DRPC
- * 
- * try (LocalDRPC drpc = new LocalDRPC()) {
- *   // Do tests
- * }
+ *
+ * try (LocalDRPC drpc = new LocalDRPC()) { // Do tests }
  */
 public class LocalDRPC implements ILocalDRPC {
 
@@ -65,7 +63,7 @@ public class LocalDRPC implements ILocalDRPC {
     public void failRequest(String id) throws AuthorizationException, 
TException {
         drpc.failRequest(id, null);
     }
-    
+
 
     @Override
     public void failRequestV2(String id, DRPCExecutionException e) throws 
AuthorizationException, TException {
@@ -82,7 +80,7 @@ public class LocalDRPC implements ILocalDRPC {
         ServiceRegistry.unregisterService(this.serviceId);
         drpc.close();
     }
-    
+
     @Override
     public void shutdown() {
         close();

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/ProcessSimulator.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/ProcessSimulator.java 
b/storm-server/src/main/java/org/apache/storm/ProcessSimulator.java
index 1388e07..f2e81cf 100644
--- a/storm-server/src/main/java/org/apache/storm/ProcessSimulator.java
+++ b/storm-server/src/main/java/org/apache/storm/ProcessSimulator.java
@@ -1,29 +1,23 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm;
-import org.apache.storm.daemon.Shutdownable;
-import org.apache.storm.utils.Utils;
 
 import java.nio.channels.ClosedByInterruptException;
 import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,13 +26,13 @@ import org.slf4j.LoggerFactory;
  * in place of actual processes (in cluster mode).
  */
 public class ProcessSimulator {
+    protected static ConcurrentHashMap<String, Shutdownable> processMap = new 
ConcurrentHashMap<String, Shutdownable>();
     private static Logger LOG = 
LoggerFactory.getLogger(ProcessSimulator.class);
     private static Object lock = new Object();
-    protected static ConcurrentHashMap<String, Shutdownable> processMap = new 
ConcurrentHashMap<String, Shutdownable>();
 
     /**
      * Register a process' handle
-     * 
+     *
      * @param pid
      * @param shutdownable
      */
@@ -48,7 +42,7 @@ public class ProcessSimulator {
 
     /**
      * Get all process handles
-     * 
+     *
      * @return
      */
     public static Collection<Shutdownable> getAllProcessHandles() {
@@ -57,7 +51,7 @@ public class ProcessSimulator {
 
     /**
      * Kill a process
-     * 
+     *
      * @param pid
      */
     public static void killProcess(String pid) {
@@ -85,7 +79,7 @@ public class ProcessSimulator {
                     LOG.warn("process {} not killed (Ignoring 
InterruptedException)", pid, e);
                 } else if 
(Utils.exceptionCauseIsInstanceOf(ClosedByInterruptException.class, e)) {
                     LOG.warn("process {} not killed (Ignoring 
ClosedByInterruptException)", pid, e);
-                } else if (e instanceof RuntimeException){
+                } else if (e instanceof RuntimeException) {
                     throw e;
                 } else {
                     //TODO once everything is in java this should not be 
possible any more

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/Testing.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/Testing.java 
b/storm-server/src/main/java/org/apache/storm/Testing.java
index c391926..5a8495b 100644
--- a/storm-server/src/main/java/org/apache/storm/Testing.java
+++ b/storm-server/src/main/java/org/apache/storm/Testing.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm;
@@ -29,7 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
-
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.GlobalStreamId;
@@ -66,13 +59,14 @@ import org.slf4j.LoggerFactory;
  * A utility that helps with testing topologies, Bolts and Spouts.
  */
 public class Testing {
-    private static final Logger LOG = LoggerFactory.getLogger(Testing.class);
     /**
-     * The default amount of wall time should be spent waiting for 
+     * The default amount of wall time should be spent waiting for
      * specific conditions to happen.  Default is 10 seconds unless
      * the environment variable STORM_TEST_TIMEOUT_MS is set.
      */
     public static final int TEST_TIMEOUT_MS;
+    private static final Logger LOG = LoggerFactory.getLogger(Testing.class);
+
     static {
         int timeout = 10_000;
         try {
@@ -82,14 +76,7 @@ public class Testing {
         }
         TEST_TIMEOUT_MS = timeout;
     }
-    
-    /**
-     * Simply produces a boolean to see if a specific state is true or false.
-     */
-    public static interface Condition {
-        public boolean exec();
-    }
-    
+
     /**
      * Continue to execute body repeatedly until condition is true or 
TEST_TIMEOUT_MS has
      * passed
@@ -100,7 +87,7 @@ public class Testing {
     public static void whileTimeout(Condition condition, Runnable body) {
         whileTimeout(TEST_TIMEOUT_MS, condition, body);
     }
-    
+
     /**
      * Continue to execute body repeatedly until condition is true or 
TEST_TIMEOUT_MS has
      * passed
@@ -124,17 +111,17 @@ public class Testing {
         }
         LOG.debug("Condition met {}", condition);
     }
-    
+
     /**
      * Convenience method for data.stream.allMatch(pred)
      */
     public static <T> boolean isEvery(Collection<T> data, Predicate<T> pred) {
         return data.stream().allMatch(pred);
     }
-    
+
     /**
      * Run with simulated time
-     * @deprecated use ``` 
+     * @deprecated use ```
      * try (Time.SimulatedTime time = new Time.SimulatedTime()) {
      *  ...
      * }
@@ -147,15 +134,15 @@ public class Testing {
             code.run();
         }
     }
-    
+
     private static LocalCluster cluster(MkClusterParam param, boolean 
simulated) throws Exception {
         return cluster(param, null, simulated);
     }
-    
+
     private static LocalCluster cluster(MkClusterParam param) throws Exception 
{
         return cluster(param, null, false);
     }
-    
+
     private static LocalCluster cluster(MkClusterParam param, String id, 
boolean simulated) throws Exception {
         Integer supervisors = param.getSupervisors();
         if (supervisors == null) {
@@ -170,18 +157,18 @@ public class Testing {
             conf = new HashMap<>();
         }
         return new LocalCluster.Builder()
-                .withSupervisors(supervisors)
-                .withPortsPerSupervisor(ports)
-                .withDaemonConf(conf)
-                .withNimbusDaemon(param.isNimbusDaemon())
-                .withTracked(id)
-                .withSimulatedTime(simulated)
-                .build();
+            .withSupervisors(supervisors)
+            .withPortsPerSupervisor(ports)
+            .withDaemonConf(conf)
+            .withNimbusDaemon(param.isNimbusDaemon())
+            .withTracked(id)
+            .withSimulatedTime(simulated)
+            .build();
     }
-    
+
     /**
      * Run with a local cluster
-     * @deprecated use ``` 
+     * @deprecated use ```
      * try (LocalCluster cluster = new LocalCluster()) {
      *  ...
      * }
@@ -192,10 +179,10 @@ public class Testing {
     public static void withLocalCluster(TestJob code) {
         withLocalCluster(new MkClusterParam(), code);
     }
-    
+
     /**
      * Run with a local cluster
-     * @deprecated use ``` 
+     * @deprecated use ```
      * try (LocalCluster cluster = new LocalCluster.Builder()....build()) {
      *  ...
      * }
@@ -211,10 +198,10 @@ public class Testing {
             throw new RuntimeException(e);
         }
     }
-    
+
     /**
      * Run with a local cluster
-     * @deprecated use ``` 
+     * @deprecated use ```
      * try (LocalCluster cluster = new LocalCluster.Builder()....build()) {
      *  ...
      * }
@@ -235,21 +222,21 @@ public class Testing {
         Boolean nimbusDaemon = (Boolean) 
clusterConf.getOrDefault("nimbus-daemon", false);
         try {
             return new LocalCluster.Builder()
-                    .withSupervisors(supervisors.intValue())
-                    .withDaemonConf(conf)
-                    .withPortsPerSupervisor(ports.intValue())
-                    .withINimbus(inimbus)
-                    .withSupervisorSlotPortMin(portMin)
-                    .withNimbusDaemon(nimbusDaemon)
-                    .build();
+                .withSupervisors(supervisors.intValue())
+                .withDaemonConf(conf)
+                .withPortsPerSupervisor(ports.intValue())
+                .withINimbus(inimbus)
+                .withSupervisorSlotPortMin(portMin)
+                .withNimbusDaemon(nimbusDaemon)
+                .build();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
-    
+
     /**
      * Run with a local cluster
-     * @deprecated use ``` 
+     * @deprecated use ```
      * try (LocalCluster cluster = new 
LocalCluster.Builder().withSimulatedTime().build()) {
      *  ...
      * }
@@ -260,10 +247,10 @@ public class Testing {
     public static void withSimulatedTimeLocalCluster(TestJob code) {
         withSimulatedTimeLocalCluster(new MkClusterParam(), code);
     }
-    
+
     /**
      * Run with a local cluster
-     * @deprecated use ``` 
+     * @deprecated use ```
      * try (LocalCluster cluster = new 
LocalCluster.Builder().withSimulatedTime()....build()) {
      *  ...
      * }
@@ -279,10 +266,10 @@ public class Testing {
             throw new RuntimeException(e);
         }
     }
-    
+
     /**
      * Run with a local cluster
-     * @deprecated use ``` 
+     * @deprecated use ```
      * try (LocalCluster cluster = new 
LocalCluster.Builder().withTracked().build()) {
      *  ...
      * }
@@ -293,7 +280,7 @@ public class Testing {
     public static void withTrackedCluster(TestJob code) {
         withTrackedCluster(new MkClusterParam(), code);
     }
-    
+
     /**
      * In a tracked topology some metrics are tracked.  This provides a way to 
get those metrics.
      * This is intended mostly for internal testing.
@@ -305,12 +292,12 @@ public class Testing {
     @Deprecated
     public static int globalAmt(String id, String key) {
         LOG.warn("Reading tracked metrics for ID {}", id);
-        return ((ConcurrentHashMap<String, 
AtomicInteger>)RegisteredGlobalState.getState(id)).get(key).get();
+        return ((ConcurrentHashMap<String, AtomicInteger>) 
RegisteredGlobalState.getState(id)).get(key).get();
     }
-    
+
     /**
      * Run with a local tracked cluster
-     * @deprecated use ``` 
+     * @deprecated use ```
      * try (LocalCluster cluster = new 
LocalCluster.Builder().withTracked()....build()) {
      *  ...
      * }
@@ -326,25 +313,7 @@ public class Testing {
             throw new RuntimeException(e);
         }
     }
-    
-    /**
-     * A topology that has all messages captured and can be read later on.
-     * This is intended mostly for internal testing.
-     * @param <T> the topology (tracked or regular)
-     */
-    public static final class CapturedTopology<T> {
-        public final T topology;
-        /**
-         * a Bolt that will hold all of the captured data.
-         */
-        public final TupleCaptureBolt capturer;
-        
-        public CapturedTopology(T topology, TupleCaptureBolt capturer) {
-            this.topology = topology;
-            this.capturer = capturer;
-        }
-    }
-    
+
     /**
      * Track and capture a topology.
      * This is intended mostly for internal testing.
@@ -353,7 +322,7 @@ public class Testing {
         CapturedTopology<StormTopology> captured = captureTopology(topology);
         return new CapturedTopology<>(new TrackedTopology(captured.topology, 
cluster), captured.capturer);
     }
-    
+
     /**
      * Rewrites a topology so that all the tuples flowing through it are 
captured
      * @param topology the topology to rewrite
@@ -362,7 +331,7 @@ public class Testing {
      */
     public static CapturedTopology<StormTopology> 
captureTopology(StormTopology topology) {
         topology = topology.deepCopy(); //Don't modify the original
-        
+
         TupleCaptureBolt capturer = new TupleCaptureBolt();
         Map<GlobalStreamId, Grouping> captureBoltInputs = new HashMap<>();
         for (Map.Entry<String, SpoutSpec> spoutEntry : 
topology.get_spouts().entrySet()) {
@@ -377,7 +346,7 @@ public class Testing {
                 }
             }
         }
-        
+
         for (Entry<String, Bolt> boltEntry : topology.get_bolts().entrySet()) {
             String id = boltEntry.getKey();
             for (Entry<String, StreamInfo> streamEntry : 
boltEntry.getValue().get_common().get_streams().entrySet()) {
@@ -391,35 +360,36 @@ public class Testing {
             }
         }
         topology.put_to_bolts(Utils.uuid(), new 
Bolt(Thrift.serializeComponentObject(capturer),
-                Thrift.prepareComponentCommon(captureBoltInputs, new 
HashMap<>(), null)));
+                                                     
Thrift.prepareComponentCommon(captureBoltInputs, new HashMap<>(), null)));
         return new CapturedTopology<>(topology, capturer);
     }
-    
+
     /**
      * Run a topology to completion capturing all of the messages that are 
emitted.  This only works when all of the spouts are
      * instances of {@link org.apache.storm.testing.CompletableSpout}
      * @param cluster the cluster to submit the topology to
      * @param topology the topology itself
-     * @return a map of the component to the list of tuples it emitted. 
+     * @return a map of the component to the list of tuples it emitted.
      * @throws InterruptedException
      * @throws TException on any error from nimbus.
      */
-    public static Map<String, List<FixedTuple>> completeTopology(ILocalCluster 
cluster, StormTopology topology) throws InterruptedException, TException {
+    public static Map<String, List<FixedTuple>> completeTopology(ILocalCluster 
cluster, StormTopology topology) throws InterruptedException,
+        TException {
         return completeTopology(cluster, topology, new 
CompleteTopologyParam());
     }
-    
+
     /**
      * Run a topology to completion capturing all of the messages that are 
emitted.  This only works when all of the spouts are
      * instances of {@link org.apache.storm.testing.CompletableSpout} or are 
overwritten by MockedSources in param
      * @param cluster the cluster to submit the topology to
      * @param topology the topology itself
      * @param param parameters to describe how to complete a topology.
-     * @return a map of the component to the list of tuples it emitted. 
+     * @return a map of the component to the list of tuples it emitted.
      * @throws InterruptedException
      * @throws TException on any error from nimbus.
      */
     public static Map<String, List<FixedTuple>> completeTopology(ILocalCluster 
cluster, StormTopology topology,
-            CompleteTopologyParam param) throws TException, 
InterruptedException {
+                                                                 
CompleteTopologyParam param) throws TException, InterruptedException {
         Map<String, List<FixedTuple>> ret = null;
         IStormClusterState state = cluster.getClusterState();
         CapturedTopology<StormTopology> capTopo = captureTopology(topology);
@@ -432,30 +402,31 @@ public class Testing {
         Map<String, SpoutSpec> spouts = topology.get_spouts();
         MockedSources ms = param.getMockedSources();
         if (ms != null) {
-            for (Entry<String, List<FixedTuple>> mocked: 
ms.getData().entrySet()) {
+            for (Entry<String, List<FixedTuple>> mocked : 
ms.getData().entrySet()) {
                 FixedTupleSpout newSpout = new 
FixedTupleSpout(mocked.getValue());
                 
spouts.get(mocked.getKey()).set_spout_object(Thrift.serializeComponentObject(newSpout));
             }
         }
         List<Object> spoutObjects = spouts.values().stream().
-                map((spec) -> 
Thrift.deserializeComponentObject(spec.get_spout_object())).collect(Collectors.toList());
-        
-        for (Object o: spoutObjects) {
+            map((spec) -> 
Thrift.deserializeComponentObject(spec.get_spout_object())).collect(Collectors.toList());
+
+        for (Object o : spoutObjects) {
             if (!(o instanceof CompletableSpout)) {
-                throw new RuntimeException("Cannot complete topology unless 
every spout is a CompletableSpout (or mocked to be); failed by " + o);
+                throw new RuntimeException(
+                    "Cannot complete topology unless every spout is a 
CompletableSpout (or mocked to be); failed by " + o);
             }
         }
-        
-        for (Object spout: spoutObjects) {
-            ((CompletableSpout)spout).startup();
+
+        for (Object spout : spoutObjects) {
+            ((CompletableSpout) spout).startup();
         }
-        
+
         cluster.submitTopology(topoName, param.getStormConf(), topology);
-        
+
         if (Time.isSimulating()) {
             cluster.advanceClusterTime(11);
         }
-        
+
         String topoId = state.getTopoId(topoName).get();
         //Give the topology time to come up without using it to wait for the 
spouts to complete
         simulateWait(cluster);
@@ -464,41 +435,41 @@ public class Testing {
             timeoutMs = TEST_TIMEOUT_MS;
         }
         whileTimeout(timeoutMs,
-                () -> !isEvery(spoutObjects, (o) -> 
((CompletableSpout)o).isExhausted()),
-                () -> {
-                    try {
-                        simulateWait(cluster);
-                    } catch (Exception e) {
-                        throw new RuntimeException();
-                    }
-                });
+                     () -> !isEvery(spoutObjects, (o) -> ((CompletableSpout) 
o).isExhausted()),
+                     () -> {
+                         try {
+                             simulateWait(cluster);
+                         } catch (Exception e) {
+                             throw new RuntimeException();
+                         }
+                     });
 
         KillOptions killOpts = new KillOptions();
         killOpts.set_wait_secs(0);
         cluster.killTopologyWithOpts(topoName, killOpts);
-        
+
         whileTimeout(timeoutMs,
-                () -> state.assignmentInfo(topoId, null) != null,
-                () -> {
-                    try {
-                        simulateWait(cluster);
-                    } catch (Exception e) {
-                        throw new RuntimeException();
-                    }
-                });
-        
+                     () -> state.assignmentInfo(topoId, null) != null,
+                     () -> {
+                         try {
+                             simulateWait(cluster);
+                         } catch (Exception e) {
+                             throw new RuntimeException();
+                         }
+                     });
+
         if (param.getCleanupState()) {
             for (Object o : spoutObjects) {
-                ((CompletableSpout)o).clean();
+                ((CompletableSpout) o).clean();
             }
             ret = capTopo.capturer.getAndRemoveResults();
         } else {
             ret = capTopo.capturer.getAndClearResults();
         }
-        
+
         return ret;
     }
-    
+
     /**
      * If using simulated time simulate waiting for 10 seconds.  This is 
intended for internal testing only.
      */
@@ -508,7 +479,7 @@ public class Testing {
             Thread.sleep(100);
         }
     }
-    
+
     /**
      * Get all of the tuples from a given component on the default stream
      * @param results the results of running a completed topology
@@ -530,7 +501,7 @@ public class Testing {
         List<List<Object>> ret = new ArrayList<>();
         List<FixedTuple> streamResult = results.get(componentId);
         if (streamResult != null) {
-            for (FixedTuple tuple: streamResult) {
+            for (FixedTuple tuple : streamResult) {
                 if (streamId.equals(tuple.stream)) {
                     ret.add(tuple.values);
                 }
@@ -538,7 +509,7 @@ public class Testing {
         }
         return ret;
     }
-    
+
     /**
      * Create a tracked topology.
      * @deprecated use {@link org.apache.storm.testing.TrackedTopology} 
directly.
@@ -547,63 +518,63 @@ public class Testing {
     public static TrackedTopology mkTrackedTopology(ILocalCluster cluster, 
StormTopology topology) {
         return new TrackedTopology(topology, cluster);
     }
-    
+
     /**
      * Simulated time wait for a tracked topology.  This is intended for 
internal testing
      */
     public static void trackedWait(CapturedTopology<TrackedTopology> topo) {
         topo.topology.trackedWait();
     }
-    
+
     /**
      * Simulated time wait for a tracked topology.  This is intended for 
internal testing
      */
     public static void trackedWait(CapturedTopology<TrackedTopology> topo, 
Integer amt) {
         topo.topology.trackedWait(amt);
     }
-    
+
     /**
      * Simulated time wait for a tracked topology.  This is intended for 
internal testing
      */
     public static void trackedWait(CapturedTopology<TrackedTopology> topo, 
Integer amt, Integer timeoutMs) {
         topo.topology.trackedWait(amt, timeoutMs);
     }
-    
+
     /**
      * Simulated time wait for a tracked topology.  This is intended for 
internal testing
      */
     public static void trackedWait(TrackedTopology topo) {
         topo.trackedWait();
     }
-    
+
     /**
      * Simulated time wait for a tracked topology.  This is intended for 
internal testing
      */
     public static void trackedWait(TrackedTopology topo, Integer amt) {
         topo.trackedWait(amt);
     }
-    
+
     /**
      * Simulated time wait for a tracked topology.  This is intended for 
internal testing
      */
     public static void trackedWait(TrackedTopology topo, Integer amt, Integer 
timeoutMs) {
         topo.trackedWait(amt, timeoutMs);
     }
-    
+
     /**
      * Simulated time wait for a cluster.  This is intended for internal 
testing
      */
     public static void advanceClusterTime(ILocalCluster cluster, Integer secs) 
throws InterruptedException {
         advanceClusterTime(cluster, secs, 1);
     }
-    
+
     /**
      * Simulated time wait for a cluster.  This is intended for internal 
testing
      */
     public static void advanceClusterTime(ILocalCluster cluster, Integer secs, 
Integer step) throws InterruptedException {
         cluster.advanceClusterTime(secs, step);
     }
-    
+
     /**
      * Count how many times each element appears in the Collection
      * @param c a collection of values
@@ -611,7 +582,7 @@ public class Testing {
      */
     public static <T> Map<T, Integer> multiset(Collection<T> c) {
         Map<T, Integer> ret = new HashMap<T, Integer>();
-        for (T t: c) {
+        for (T t : c) {
             Integer i = ret.get(t);
             if (i == null) {
                 i = new Integer(0);
@@ -621,40 +592,40 @@ public class Testing {
         }
         return ret;
     }
-    
+
     private static void printRec(Object o, String prefix) {
         if (o instanceof Collection) {
-            LOG.info("{} {} ({}) [",prefix,o, o.getClass());
-            for (Object sub: (Collection)o) {
+            LOG.info("{} {} ({}) [", prefix, o, o.getClass());
+            for (Object sub : (Collection) o) {
                 printRec(sub, prefix + "  ");
             }
-            LOG.info("{} ]",prefix);
+            LOG.info("{} ]", prefix);
         } else if (o instanceof Map) {
-            Map<?,?> m = (Map<?,?>)o;
-            LOG.info("{} {} ({}) {",prefix,o, o.getClass());
-            for (Map.Entry<?, ?> entry: m.entrySet()) {
+            Map<?, ?> m = (Map<?, ?>) o;
+            LOG.info("{} {} ({}) {", prefix, o, o.getClass());
+            for (Map.Entry<?, ?> entry : m.entrySet()) {
                 printRec(entry.getKey(), prefix + "  ");
                 LOG.info("{} ->", prefix);
                 printRec(entry.getValue(), prefix + "  ");
             }
-            LOG.info("{} }",prefix);
+            LOG.info("{} }", prefix);
         } else {
             LOG.info("{} {} ({})", prefix, o, o.getClass());
         }
     }
-    
+
     /**
      * Check if two collections are equivalent ignoring the order of elements.
      */
     public static <T> boolean multiseteq(Collection<T> a, Collection<T> b) {
         boolean ret = multiset(a).equals(multiset(b));
-        if (! ret) {
+        if (!ret) {
             printRec(multiset(a), "MS-A:");
             printRec(multiset(b), "MS-B:");
         }
         return ret;
     }
-    
+
     /**
      * Create a {@link org.apache.storm.tuple.Tuple} for use with testing
      * @param values the values to appear in the tuple
@@ -662,7 +633,7 @@ public class Testing {
     public static Tuple testTuple(List<Object> values) {
         return testTuple(values, new MkTupleParam());
     }
-    
+
     /**
      * Create a {@link org.apache.storm.tuple.Tuple} for use with testing
      * @param values the values to appear in the tuple
@@ -673,19 +644,19 @@ public class Testing {
         if (stream == null) {
             stream = Utils.DEFAULT_STREAM_ID;
         }
-        
+
         String component = param.getComponent();
         if (component == null) {
             component = "component";
         }
-        
+
         int task = 1;
-        
+
         List<String> fields = param.getFields();
         if (fields == null) {
             fields = new ArrayList<>(values.size());
             for (int i = 1; i <= values.size(); i++) {
-                fields.add("field"+i);
+                fields.add("field" + i);
             }
         }
 
@@ -696,23 +667,48 @@ public class Testing {
         streamToFields.put(stream, new Fields(fields));
         compToStreamToFields.put(component, streamToFields);
 
-        TopologyContext context= new TopologyContext(null,
-                ConfigUtils.readStormConfig(),
-                taskToComp,
-                null,
-                compToStreamToFields,
-                null,
-                "test-storm-id",
-                null,
-                null,
-                1,
-                null,
-                null,
-                new HashMap<>(),
-                new HashMap<>(),
-                new HashMap<>(),
-                new HashMap<>(),
-                new AtomicBoolean(false));
+        TopologyContext context = new TopologyContext(null,
+                                                      
ConfigUtils.readStormConfig(),
+                                                      taskToComp,
+                                                      null,
+                                                      compToStreamToFields,
+                                                      null,
+                                                      "test-storm-id",
+                                                      null,
+                                                      null,
+                                                      1,
+                                                      null,
+                                                      null,
+                                                      new HashMap<>(),
+                                                      new HashMap<>(),
+                                                      new HashMap<>(),
+                                                      new HashMap<>(),
+                                                      new 
AtomicBoolean(false));
         return new TupleImpl(context, values, component, 1, stream);
     }
+
+    /**
+     * Simply produces a boolean to see if a specific state is true or false.
+     */
+    public static interface Condition {
+        public boolean exec();
+    }
+
+    /**
+     * A topology that has all messages captured and can be read later on.
+     * This is intended mostly for internal testing.
+     * @param <T> the topology (tracked or regular)
+     */
+    public static final class CapturedTopology<T> {
+        public final T topology;
+        /**
+         * a Bolt that will hold all of the captured data.
+         */
+        public final TupleCaptureBolt capturer;
+
+        public CapturedTopology(T topology, TupleCaptureBolt capturer) {
+            this.topology = topology;
+            this.capturer = capturer;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/blobstore/BlobKeySequenceInfo.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/blobstore/BlobKeySequenceInfo.java
 
b/storm-server/src/main/java/org/apache/storm/blobstore/BlobKeySequenceInfo.java
index 8a01f1e..e50575a 100644
--- 
a/storm-server/src/main/java/org/apache/storm/blobstore/BlobKeySequenceInfo.java
+++ 
b/storm-server/src/main/java/org/apache/storm/blobstore/BlobKeySequenceInfo.java
@@ -1,40 +1,35 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.blobstore;
 
 public class BlobKeySequenceInfo {
     private String nimbusHostPort;
     private String sequenceNumber;
 
-    public void setNimbusHostPort(String nimbusHostPort) {
-     this.nimbusHostPort = nimbusHostPort;
-    }
-
-    public void setSequenceNumber(String sequenceNumber) {
-        this.sequenceNumber = sequenceNumber;
-    }
-
     public String getNimbusHostPort() {
         return nimbusHostPort;
     }
 
+    public void setNimbusHostPort(String nimbusHostPort) {
+        this.nimbusHostPort = nimbusHostPort;
+    }
+
     public String getSequenceNumber() {
         return sequenceNumber;
     }
 
+    public void setSequenceNumber(String sequenceNumber) {
+        this.sequenceNumber = sequenceNumber;
+    }
+
 }

Reply via email to