http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/LocalCluster.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java index 7db41b5..2db0240 100644 --- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java +++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java @@ -74,10 +74,10 @@ import org.apache.storm.generated.RebalanceOptions; import org.apache.storm.generated.SettableBlobMeta; import org.apache.storm.generated.StormTopology; import org.apache.storm.generated.SubmitOptions; -import org.apache.storm.generated.SupervisorPageInfo; import org.apache.storm.generated.SupervisorAssignments; -import org.apache.storm.generated.SupervisorWorkerHeartbeats; +import org.apache.storm.generated.SupervisorPageInfo; import org.apache.storm.generated.SupervisorWorkerHeartbeat; +import org.apache.storm.generated.SupervisorWorkerHeartbeats; import org.apache.storm.generated.TopologyHistoryInfo; import org.apache.storm.generated.TopologyInfo; import org.apache.storm.generated.TopologyPageInfo; @@ -111,253 +111,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A stand alone storm cluster that runs inside a single process. - * It is intended to be used for testing. Both internal testing for - * Apache Storm itself and for people building storm topologies. - *<p> - * LocalCluster is an AutoCloseable so if you are using it in tests you can use - * a try block to be sure it is shut down. + * A stand alone storm cluster that runs inside a single process. It is intended to be used for testing. Both internal testing for Apache + * Storm itself and for people building storm topologies. + * <p> + * LocalCluster is an AutoCloseable so if you are using it in tests you can use a try block to be sure it is shut down. * </p> - * try (LocalCluster cluster = new LocalCluster()) { - * // Do some tests - * } - * // The cluster has been shut down. + * try (LocalCluster cluster = new LocalCluster()) { // Do some tests } // The cluster has been shut down. */ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { + public static final KillOptions KILL_NOW = new KillOptions(); private static final Logger LOG = LoggerFactory.getLogger(LocalCluster.class); - - private static ThriftServer startNimbusDaemon(Map<String, Object> conf, Nimbus nimbus) { - ThriftServer ret = new ThriftServer(conf, new Processor<>(nimbus), ThriftConnectionType.NIMBUS); - LOG.info("Starting Nimbus server..."); - new Thread(() -> ret.serve()).start(); - return ret; - } - - /** - * Simple way to configure a LocalCluster to meet your needs. - */ - public static class Builder { - private int supervisors = 2; - private int portsPerSupervisor = 3; - private Map<String, Object> daemonConf = new HashMap<>(); - private INimbus inimbus = null; - private IGroupMappingServiceProvider groupMapper = null; - private int supervisorSlotPortMin = 1024; - private boolean nimbusDaemon = false; - private UnaryOperator<Nimbus> nimbusWrapper = null; - private BlobStore store = null; - private TopoCache topoCache = null; - private IStormClusterState clusterState = null; - private ILeaderElector leaderElector = null; - private String trackId = null; - private boolean simulateTime = false; - - /** - * Set the number of supervisors the cluster should have. - */ - public Builder withSupervisors(int supervisors) { - if (supervisors < 0) { - throw new IllegalArgumentException("supervisors cannot be negative"); - } - this.supervisors = supervisors; - return this; - } - - /** - * Set the number of slots/ports each supervisor should have. - */ - public Builder withPortsPerSupervisor(int portsPerSupervisor) { - if (portsPerSupervisor < 0) { - throw new IllegalArgumentException("supervisor ports cannot be negative"); - } - this.portsPerSupervisor = portsPerSupervisor; - return this; - } - - /** - * Set the base config that the daemons should use. - */ - public Builder withDaemonConf(Map<String, Object> conf) { - if (conf != null) { - this.daemonConf = new HashMap<>(conf); - } - return this; - } - - /** - * Add an single key/value config to the daemon conf. - */ - public Builder withDaemonConf(String key, Object value) { - this.daemonConf.put(key, value); - return this; - } - - /** - * Override the INimbus instance that nimbus will use. - */ - public Builder withINimbus(INimbus inimbus) { - this.inimbus = inimbus; - return this; - } - - /** - * Override the code that maps users to groups for authorization. - */ - public Builder withGroupMapper(IGroupMappingServiceProvider groupMapper) { - this.groupMapper = groupMapper; - return this; - } - - /** - * When assigning ports to worker slots start at minPort. - */ - public Builder withSupervisorSlotPortMin(Number minPort) { - int port = 1024; - if (minPort == null) { - LOG.warn("Number is null... {}", minPort); - } else { - port = minPort.intValue(); - } - if (port <= 0) { - throw new IllegalArgumentException("port must be positive"); - } - this.supervisorSlotPortMin = port; - return this; - } - - /** - * Have the local nimbus actually launch a thrift server. This is intended to - * be used mostly for internal storm testing. - */ - public Builder withNimbusDaemon() { - return withNimbusDaemon(true); - } - - /** - * If nimbusDaemon is true the local nimbus will launch a thrift server. This is intended to - * be used mostly for internal storm testing. - */ - public Builder withNimbusDaemon(Boolean nimbusDaemon) { - if (nimbusDaemon == null) { - nimbusDaemon = false; - LOG.warn("nimbusDaemon is null"); - } - this.nimbusDaemon = nimbusDaemon; - return this; - } - - /** - * Turn on simulated time in the cluster. This allows someone to simulate long periods of - * time for timeouts etc when testing nimbus/supervisors themselves. NOTE: that this only - * works for code that uses the {@link org.apache.storm.utils.Time} class for time management - * so it will not work in all cases. - */ - public Builder withSimulatedTime() { - return withSimulatedTime(true); - } - - /** - * Turn on simulated time in the cluster. This allows someone to simulate long periods of - * time for timeouts etc when testing nimbus/supervisors themselves. NOTE: that this only - * works for code that uses the {@link org.apache.storm.utils.Time} class for time management - * so it will not work in all cases. - */ - public Builder withSimulatedTime(boolean simulateTime) { - this.simulateTime = simulateTime; - return this; - } - - /** - * Before nimbus is created/used call nimbusWrapper on it first and use the - * result instead. This is intended for internal testing only, and it here to - * allow a mocking framework to spy on the nimbus class. - */ - public Builder withNimbusWrapper(UnaryOperator<Nimbus> nimbusWrapper) { - this.nimbusWrapper = nimbusWrapper; - return this; - } - - /** - * Use the following blobstore instead of the one in the config. - * This is intended mostly for internal testing with Mocks. - */ - public Builder withBlobStore(BlobStore store) { - this.store = store; - return this; - } - - /** - * Use the following topo cache instead of creating out own. - * This is intended mostly for internal testing with Mocks. - */ - public Builder withTopoCache(TopoCache topoCache) { - this.topoCache = topoCache; - return this; - } - /** - * Use the following clusterState instead of the one in the config. - * This is intended mostly for internal testing with Mocks. - */ - public Builder withClusterState(IStormClusterState clusterState) { - this.clusterState = clusterState; - return this; - } - - /** - * Use the following leaderElector instead of the one in the config. - * This is intended mostly for internal testing with Mocks. - */ - public Builder withLeaderElector(ILeaderElector leaderElector) { - this.leaderElector = leaderElector; - return this; - } - - /** - * A tracked cluster can run tracked topologies. - * See {@link org.apache.storm.testing.TrackedTopology} for more information - * on tracked topologies. - * @param trackId an arbitrary unique id that is used to keep track of tracked topologies - */ - public Builder withTracked(String trackId) { - this.trackId = trackId; - return this; - } - - /** - * A tracked cluster can run tracked topologies. - * See {@link org.apache.storm.testing.TrackedTopology} for more information - * on tracked topologies. - */ - public Builder withTracked() { - this.trackId = Utils.uuid(); - return this; - } - - /** - * Builds a new LocalCluster. - * @return the LocalCluster - * @throws Exception on any one of many different errors. - * This is intended for testing so yes it is ugly and throws Exception... - */ - public LocalCluster build() throws Exception { - return new LocalCluster(this); - } + static { + KILL_NOW.set_wait_secs(0); } - - private static class TrackedStormCommon extends StormCommon { - private final String id; - public TrackedStormCommon(String id) { - this.id = id; - } - - @Override - public IBolt makeAckerBoltImpl() { - return new NonRichBoltTracker(new Acker(), id); - } - } - private final Nimbus nimbus; //This is very private and does not need to be exposed private final AtomicInteger portCounter; @@ -373,27 +141,28 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { private final StormCommonInstaller commonInstaller; private final SimulatedTime time; private final NimbusClient.LocalOverride nimbusOverride; - /** * Create a default LocalCluster. + * * @throws Exception on any error */ public LocalCluster() throws Exception { this(new Builder().withDaemonConf(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true)); } - + /** * Create a LocalCluster that connects to an existing Zookeeper instance. + * * @param zkHost the host for ZK * @param zkPort the port for ZK * @throws Exception on any error */ public LocalCluster(String zkHost, Long zkPort) throws Exception { this(new Builder().withDaemonConf(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true) - .withDaemonConf(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkHost)) - .withDaemonConf(Config.STORM_ZOOKEEPER_PORT, zkPort)); + .withDaemonConf(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkHost)) + .withDaemonConf(Config.STORM_ZOOKEEPER_PORT, zkPort)); } - + @SuppressWarnings("deprecation") private LocalCluster(Builder builder) throws Exception { if (builder.simulateTime) { @@ -416,7 +185,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { } else { this.commonInstaller = null; } - + this.tmpDirs = new ArrayList<>(); this.supervisors = new ArrayList<>(); TmpPath nimbusTmp = new TmpPath(); @@ -428,7 +197,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { conf.put(Config.STORM_CLUSTER_MODE, "local"); conf.put(Config.BLOBSTORE_SUPERUSER, System.getProperty("user.name")); conf.put(Config.BLOBSTORE_DIR, nimbusTmp.getPath()); - + InProcessZookeeper zookeeper = null; if (!builder.daemonConf.containsKey(Config.STORM_ZOOKEEPER_SERVERS)) { zookeeper = new InProcessZookeeper(); @@ -438,7 +207,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { this.zookeeper = zookeeper; conf.putAll(builder.daemonConf); this.daemonConf = new HashMap<>(conf); - + this.portCounter = new AtomicInteger(builder.supervisorSlotPortMin); ClusterStateContext cs = new ClusterStateContext(DaemonType.NIMBUS, daemonConf); this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, cs); @@ -450,8 +219,8 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { //Set it for nimbus only conf.put(Config.STORM_LOCAL_DIR, nimbusTmp.getPath()); Nimbus nimbus = new Nimbus(conf, builder.inimbus == null ? new StandaloneINimbus() : builder.inimbus, - this.getClusterState(), null, builder.store, builder.topoCache, builder.leaderElector, - builder.groupMapper); + this.getClusterState(), null, builder.store, builder.topoCache, builder.leaderElector, + builder.groupMapper); if (builder.nimbusWrapper != null) { nimbus = builder.nimbusWrapper.apply(nimbus); } @@ -464,11 +233,11 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { } this.sharedContext = context; this.thriftServer = builder.nimbusDaemon ? startNimbusDaemon(this.daemonConf, this.nimbus) : null; - + for (int i = 0; i < builder.supervisors; i++) { addSupervisor(builder.portsPerSupervisor, null, null); } - + //Wait for a leader to be elected (or topology submission can be rejected) try { long timeoutAfter = System.currentTimeMillis() + 10_000; @@ -479,7 +248,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { Thread.sleep(1); } } catch (Exception e) { - //Ignore any exceptions we might be doing a test for authentication + //Ignore any exceptions we might be doing a test for authentication } if (thriftServer == null) { //We don't want to override the client if there is a thrift server up and running, or we would not test any @@ -496,16 +265,88 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { } } + private static ThriftServer startNimbusDaemon(Map<String, Object> conf, Nimbus nimbus) { + ThriftServer ret = new ThriftServer(conf, new Processor<>(nimbus), ThriftConnectionType.NIMBUS); + LOG.info("Starting Nimbus server..."); + new Thread(() -> ret.serve()).start(); + return ret; + } + + private static boolean areAllWorkersWaiting() { + boolean ret = true; + for (Shutdownable s : ProcessSimulator.getAllProcessHandles()) { + if (s instanceof DaemonCommon) { + ret = ret && ((DaemonCommon) s).isWaiting(); + } + } + return ret; + } + + /** + * Run c with a local mode cluster overriding the NimbusClient and DRPCClient calls. NOTE local mode override happens by default now + * unless netty is turned on for the local cluster. + * + * @param c the callable to run in this mode + * @param ttlSec the number of seconds to let the cluster run after c has completed + * @return the result of calling C + * + * @throws Exception on any Exception. + */ + public static <T> T withLocalModeOverride(Callable<T> c, long ttlSec) throws Exception { + LOG.info("\n\n\t\tSTARTING LOCAL MODE CLUSTER\n\n"); + try (LocalCluster local = new LocalCluster(); + LocalDRPC drpc = new LocalDRPC(); + DRPCClient.LocalOverride drpcOverride = new DRPCClient.LocalOverride(drpc)) { + + T ret = c.call(); + LOG.info("\n\n\t\tRUNNING LOCAL CLUSTER for {} seconds.\n\n", ttlSec); + Thread.sleep(ttlSec * 1000); + + LOG.info("\n\n\t\tSTOPPING LOCAL MODE CLUSTER\n\n"); + return ret; + } + } + + public static void main(final String[] args) throws Exception { + if (args.length < 1) { + throw new IllegalArgumentException("No class was specified to run"); + } + + long ttl = 20; + String ttlString = System.getProperty("storm.local.sleeptime", "20"); + try { + ttl = Long.valueOf(ttlString); + } catch (NumberFormatException e) { + LOG.warn("could not parse the sleep time defaulting to {} seconds", ttl); + } + + withLocalModeOverride(() -> { + String klass = args[0]; + String[] newArgs = Arrays.copyOfRange(args, 1, args.length); + Class<?> c = Class.forName(klass); + Method main = c.getDeclaredMethod("main", String[].class); + + LOG.info("\n\n\t\tRUNNING {} with args {}\n\n", main, Arrays.toString(newArgs)); + main.invoke(null, (Object) newArgs); + return (Void) null; + }, ttl); + + //Sometimes external things used with testing don't shut down all the way + System.exit(0); + } + /** * Checks if Nimbuses have elected a leader. + * * @return boolean + * * @throws AuthorizationException * @throws TException */ private boolean hasLeader() throws AuthorizationException, TException { ClusterSummary summary = getNimbus().getClusterInfo(); if (summary.is_set_nimbuses()) { - for (NimbusSummary sum: summary.get_nimbuses()) { + for (NimbusSummary sum : summary.get_nimbuses()) { if (sum.is_isLeader()) { return true; } @@ -520,54 +361,22 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { public Nimbus getNimbus() { return nimbus; } - + /** * @return the base config for the daemons. */ public Map<String, Object> getDaemonConf() { return new HashMap<>(daemonConf); } - - public static final KillOptions KILL_NOW = new KillOptions(); - - static { - KILL_NOW.set_wait_secs(0); - } - - /** - * When running a topology locally, for tests etc. It is helpful to be sure - * that the topology is dead before the test exits. This is an AutoCloseable - * topology that not only gives you access to the compiled StormTopology - * but also will kill the topology when it closes. - * - * try (LocalTopology testTopo = cluster.submitTopology("testing", ...)) { - * // Run Some test - * } - * // The topology has been killed - */ - public class LocalTopology extends StormTopology implements ILocalTopology { - private static final long serialVersionUID = 6145919776650637748L; - private final String topoName; - - public LocalTopology(String topoName, StormTopology topo) { - super(topo); - this.topoName = topoName; - } - @Override - public void close() throws TException { - killTopologyWithOpts(topoName, KILL_NOW); - } - } - @Override public LocalTopology submitTopology(String topologyName, Map<String, Object> conf, StormTopology topology) - throws TException { + throws TException { if (!Utils.isValidConf(conf)) { throw new IllegalArgumentException("Topology conf is not json-serializable"); } getNimbus().submitTopology(topologyName, null, JSONValue.toJSONString(conf), Utils.addVersions(topology)); - + ISubmitterHook hook = (ISubmitterHook) Utils.getConfiguredClass(conf, Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN); if (hook != null) { TopologyInfo topologyInfo = Utils.getTopologyInfo(topologyName, null, conf); @@ -581,27 +390,29 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { } @Override - public LocalTopology submitTopologyWithOpts(String topologyName, Map<String, Object> conf, StormTopology topology, SubmitOptions submitOpts) - throws TException { + public LocalTopology submitTopologyWithOpts(String topologyName, Map<String, Object> conf, StormTopology topology, + SubmitOptions submitOpts) + throws TException { if (!Utils.isValidConf(conf)) { throw new IllegalArgumentException("Topology conf is not json-serializable"); } - getNimbus().submitTopologyWithOpts(topologyName, null, JSONValue.toJSONString(conf), Utils.addVersions(topology), submitOpts); + getNimbus().submitTopologyWithOpts(topologyName, null, JSONValue.toJSONString(conf), Utils.addVersions(topology), submitOpts); return new LocalTopology(topologyName, topology); } @Override public LocalTopology submitTopology(String topologyName, Map<String, Object> conf, TrackedTopology topology) - throws TException { + throws TException { return submitTopology(topologyName, conf, topology.getTopology()); } @Override - public LocalTopology submitTopologyWithOpts(String topologyName, Map<String, Object> conf, TrackedTopology topology, SubmitOptions submitOpts) - throws TException { - return submitTopologyWithOpts(topologyName, conf, topology.getTopology(), submitOpts); + public LocalTopology submitTopologyWithOpts(String topologyName, Map<String, Object> conf, TrackedTopology topology, + SubmitOptions submitOpts) + throws TException { + return submitTopologyWithOpts(topologyName, conf, topology.getTopology(), submitOpts); } - + @Override public void uploadNewCredentials(String topologyName, Credentials creds) throws TException { getNimbus().uploadNewCredentials(topologyName, creds); @@ -690,7 +501,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { if (getClusterState() != null) { getClusterState().disconnect(); } - for (Supervisor s: supervisors) { + for (Supervisor s : supervisors) { s.shutdownAllWorkers(null, ReadClusterState.THREAD_DUMP_ON_ERROR); s.close(); } @@ -700,45 +511,47 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { zookeeper.close(); LOG.info("Done shutting down in process zookeeper"); } - - for (TmpPath p: tmpDirs) { + + for (TmpPath p : tmpDirs) { p.close(); } - + if (this.trackId != null) { LOG.warn("Clearing tracked metrics for ID {}", this.trackId); LocalExecutor.clearTrackId(); RegisteredGlobalState.clearState(this.trackId); } - + if (this.commonInstaller != null) { this.commonInstaller.close(); } - + if (time != null) { time.close(); } } - + /** * Get a specific Supervisor. This is intended mostly for internal testing. + * * @param id the id of the supervisor */ public synchronized Supervisor getSupervisor(String id) { - for (Supervisor s: supervisors) { + for (Supervisor s : supervisors) { if (id.equals(s.getId())) { return s; } } return null; } - + /** * Kill a specific supervisor. This is intended mostly for internal testing. + * * @param id the id of the supervisor */ public synchronized void killSupervisor(String id) { - for (Iterator<Supervisor> it = supervisors.iterator(); it.hasNext();) { + for (Iterator<Supervisor> it = supervisors.iterator(); it.hasNext(); ) { Supervisor s = it.next(); if (id.equals(s.getId())) { it.remove(); @@ -748,7 +561,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { } } } - + /** * Add another supervisor to the topology. This is intended mostly for internal testing. */ @@ -758,26 +571,29 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { /** * Add another supervisor to the topology. This is intended mostly for internal testing. + * * @param ports the number of ports/slots the supervisor should have */ public Supervisor addSupervisor(Number ports) throws Exception { return addSupervisor(ports, null, null); } - + /** * Add another supervisor to the topology. This is intended mostly for internal testing. + * * @param ports the number of ports/slots the supervisor should have - * @param id the id of the new supervisor, so you can find it later. + * @param id the id of the new supervisor, so you can find it later. */ public Supervisor addSupervisor(Number ports, String id) throws Exception { return addSupervisor(ports, null, id); } - + /** * Add another supervisor to the topology. This is intended mostly for internal testing. + * * @param ports the number of ports/slots the supervisor should have - * @param conf any config values that should be added/over written in the daemon conf of the cluster. - * @param id the id of the new supervisor, so you can find it later. + * @param conf any config values that should be added/over written in the daemon conf of the cluster. + * @param id the id of the new supervisor, so you can find it later. */ public synchronized Supervisor addSupervisor(Number ports, Map<String, Object> conf, String id) throws Exception { if (ports == null) { @@ -785,19 +601,19 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { } TmpPath tmpDir = new TmpPath(); tmpDirs.add(tmpDir); - + List<Integer> portNumbers = new ArrayList<>(ports.intValue()); for (int i = 0; i < ports.intValue(); i++) { portNumbers.add(portCounter.getAndIncrement()); } - + Map<String, Object> superConf = new HashMap<>(daemonConf); if (conf != null) { superConf.putAll(conf); } superConf.put(Config.STORM_LOCAL_DIR, tmpDir.getPath()); superConf.put(DaemonConfig.SUPERVISOR_SLOTS_PORTS, portNumbers); - + final String superId = id == null ? Utils.uuid() : id; ISupervisor isuper = new StandaloneSupervisor() { @Override @@ -808,7 +624,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { if (!ConfigUtils.isLocalMode(superConf)) { throw new IllegalArgumentException("Cannot start server in distrubuted mode!"); } - + Supervisor s = new Supervisor(superConf, sharedContext, isuper); s.launch(); s.setLocalNimbus(this.nimbus); @@ -816,51 +632,39 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { supervisors.add(s); return s; } - + private boolean areAllSupervisorsWaiting() { boolean ret = true; - for (Supervisor s: supervisors) { + for (Supervisor s : supervisors) { ret = ret && s.isWaiting(); } return ret; } - - private static boolean areAllWorkersWaiting() { - boolean ret = true; - for (Shutdownable s: ProcessSimulator.getAllProcessHandles()) { - if (s instanceof DaemonCommon) { - ret = ret && ((DaemonCommon)s).isWaiting(); - } - } - return ret; - } - + /** - * Wait for the cluster to be idle. This is intended to be used with - * Simulated time and is for internal testing. + * Wait for the cluster to be idle. This is intended to be used with Simulated time and is for internal testing. + * * @throws InterruptedException if interrupted while waiting. - * @throws AssertionError if the cluster did not come to an idle point with - * a timeout. + * @throws AssertionError if the cluster did not come to an idle point with a timeout. */ public void waitForIdle() throws InterruptedException { waitForIdle(Testing.TEST_TIMEOUT_MS); } - + /** - * Wait for the cluster to be idle. This is intended to be used with - * Simulated time and is for internal testing. + * Wait for the cluster to be idle. This is intended to be used with Simulated time and is for internal testing. + * * @param timeoutMs the number of ms to wait before throwing an error. * @throws InterruptedException if interrupted while waiting. - * @throws AssertionError if the cluster did not come to an idle point with - * a timeout. + * @throws AssertionError if the cluster did not come to an idle point with a timeout. */ public void waitForIdle(long timeoutMs) throws InterruptedException { Random rand = ThreadLocalRandom.current(); //wait until all workers, supervisors, and nimbus is waiting final long endTime = System.currentTimeMillis() + timeoutMs; while (!(nimbus.isWaiting() && - areAllSupervisorsWaiting() && - areAllWorkersWaiting())) { + areAllSupervisorsWaiting() && + areAllWorkersWaiting())) { if (System.currentTimeMillis() >= endTime) { LOG.info("Cluster was not idle in {} ms", timeoutMs); LOG.info(Utils.threadDump()); @@ -869,12 +673,12 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { Thread.sleep(rand.nextInt(20)); } } - + @Override public void advanceClusterTime(int secs) throws InterruptedException { advanceClusterTime(secs, 1); } - + @Override public void advanceClusterTime(int secs, int incSecs) throws InterruptedException { for (int amountLeft = secs; amountLeft > 0; amountLeft -= incSecs) { @@ -888,17 +692,15 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { public IStormClusterState getClusterState() { return clusterState; } - + @Override public String getTrackedId() { return trackId; } - //Nimbus Compatibility - @Override public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) - throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, TException { + throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, TException { try { @SuppressWarnings("unchecked") Map<String, Object> conf = (Map<String, Object>) JSONValue.parseWithException(jsonConf); @@ -908,10 +710,12 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { } } + //Nimbus Compatibility + @Override public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, - SubmitOptions options) - throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, TException { + SubmitOptions options) + throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, TException { try { @SuppressWarnings("unchecked") Map<String, Object> conf = (Map<String, Object>) JSONValue.parseWithException(jsonConf); @@ -935,7 +739,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { @Override public void debug(String name, String component, boolean enable, double samplingPercentage) - throws NotAliveException, AuthorizationException, TException { + throws NotAliveException, AuthorizationException, TException { // TODO Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @@ -948,14 +752,14 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { @Override public List<ProfileRequest> getComponentPendingProfileActions(String id, String componentId, ProfileAction action) - throws TException { + throws TException { // TODO Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public String beginCreateBlob(String key, SettableBlobMeta meta) - throws AuthorizationException, KeyAlreadyExistsException, TException { + throws AuthorizationException, KeyAlreadyExistsException, TException { throw new RuntimeException("BLOBS NOT SUPPORTED IN LOCAL MODE"); } @@ -986,13 +790,13 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { @Override public void setBlobMeta(String key, SettableBlobMeta meta) - throws AuthorizationException, KeyNotFoundException, TException { + throws AuthorizationException, KeyNotFoundException, TException { throw new KeyNotFoundException("BLOBS NOT SUPPORTED IN LOCAL MODE"); } @Override public BeginDownloadResult beginBlobDownload(String key) - throws AuthorizationException, KeyNotFoundException, TException { + throws AuthorizationException, KeyNotFoundException, TException { throw new KeyNotFoundException("BLOBS NOT SUPPORTED IN LOCAL MODE"); } @@ -1021,7 +825,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { @Override public int updateBlobReplication(String key, int replication) - throws AuthorizationException, KeyNotFoundException, TException { + throws AuthorizationException, KeyNotFoundException, TException { throw new KeyNotFoundException("BLOBS NOT SUPPORTED IN LOCAL MODE"); } @@ -1075,28 +879,28 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { @Override public TopologyInfo getTopologyInfoWithOpts(String id, GetInfoOptions options) - throws NotAliveException, AuthorizationException, TException { + throws NotAliveException, AuthorizationException, TException { // TODO Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public TopologyPageInfo getTopologyPageInfo(String id, String window, boolean is_include_sys) - throws NotAliveException, AuthorizationException, TException { + throws NotAliveException, AuthorizationException, TException { // TODO Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public SupervisorPageInfo getSupervisorPageInfo(String id, String host, boolean is_include_sys) - throws NotAliveException, AuthorizationException, TException { + throws NotAliveException, AuthorizationException, TException { // TODO Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public ComponentPageInfo getComponentPageInfo(String topology_id, String component_id, String window, - boolean is_include_sys) throws NotAliveException, AuthorizationException, TException { + boolean is_include_sys) throws NotAliveException, AuthorizationException, TException { // TODO Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @@ -1112,29 +916,6 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { // TODO Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } - - /** - * Run c with a local mode cluster overriding the NimbusClient and DRPCClient calls. - * NOTE local mode override happens by default now unless netty is turned on for the local cluster. - * @param c the callable to run in this mode - * @param ttlSec the number of seconds to let the cluster run after c has completed - * @return the result of calling C - * @throws Exception on any Exception. - */ - public static <T> T withLocalModeOverride(Callable<T> c, long ttlSec) throws Exception { - LOG.info("\n\n\t\tSTARTING LOCAL MODE CLUSTER\n\n"); - try (LocalCluster local = new LocalCluster(); - LocalDRPC drpc = new LocalDRPC(); - DRPCClient.LocalOverride drpcOverride = new DRPCClient.LocalOverride(drpc)) { - - T ret = c.call(); - LOG.info("\n\n\t\tRUNNING LOCAL CLUSTER for {} seconds.\n\n", ttlSec); - Thread.sleep(ttlSec * 1000); - - LOG.info("\n\n\t\tSTOPPING LOCAL MODE CLUSTER\n\n"); - return ret; - } - } @Override public List<OwnerResourceSummary> getOwnerResourceSummaries(String owner) throws AuthorizationException, TException { @@ -1161,31 +942,243 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { getNimbus().processWorkerMetrics(metrics); } - public static void main(final String [] args) throws Exception { - if (args.length < 1) { - throw new IllegalArgumentException("No class was specified to run"); + /** + * Simple way to configure a LocalCluster to meet your needs. + */ + public static class Builder { + private int supervisors = 2; + private int portsPerSupervisor = 3; + private Map<String, Object> daemonConf = new HashMap<>(); + private INimbus inimbus = null; + private IGroupMappingServiceProvider groupMapper = null; + private int supervisorSlotPortMin = 1024; + private boolean nimbusDaemon = false; + private UnaryOperator<Nimbus> nimbusWrapper = null; + private BlobStore store = null; + private TopoCache topoCache = null; + private IStormClusterState clusterState = null; + private ILeaderElector leaderElector = null; + private String trackId = null; + private boolean simulateTime = false; + + /** + * Set the number of supervisors the cluster should have. + */ + public Builder withSupervisors(int supervisors) { + if (supervisors < 0) { + throw new IllegalArgumentException("supervisors cannot be negative"); + } + this.supervisors = supervisors; + return this; } - - long ttl = 20; - String ttlString = System.getProperty("storm.local.sleeptime", "20"); - try { - ttl = Long.valueOf(ttlString); - } catch (NumberFormatException e) { - LOG.warn("could not parse the sleep time defaulting to {} seconds", ttl); + + /** + * Set the number of slots/ports each supervisor should have. + */ + public Builder withPortsPerSupervisor(int portsPerSupervisor) { + if (portsPerSupervisor < 0) { + throw new IllegalArgumentException("supervisor ports cannot be negative"); + } + this.portsPerSupervisor = portsPerSupervisor; + return this; + } + + /** + * Set the base config that the daemons should use. + */ + public Builder withDaemonConf(Map<String, Object> conf) { + if (conf != null) { + this.daemonConf = new HashMap<>(conf); + } + return this; + } + + /** + * Add an single key/value config to the daemon conf. + */ + public Builder withDaemonConf(String key, Object value) { + this.daemonConf.put(key, value); + return this; + } + + /** + * Override the INimbus instance that nimbus will use. + */ + public Builder withINimbus(INimbus inimbus) { + this.inimbus = inimbus; + return this; + } + + /** + * Override the code that maps users to groups for authorization. + */ + public Builder withGroupMapper(IGroupMappingServiceProvider groupMapper) { + this.groupMapper = groupMapper; + return this; + } + + /** + * When assigning ports to worker slots start at minPort. + */ + public Builder withSupervisorSlotPortMin(Number minPort) { + int port = 1024; + if (minPort == null) { + LOG.warn("Number is null... {}", minPort); + } else { + port = minPort.intValue(); + } + if (port <= 0) { + throw new IllegalArgumentException("port must be positive"); + } + this.supervisorSlotPortMin = port; + return this; + } + + /** + * Have the local nimbus actually launch a thrift server. This is intended to be used mostly for internal storm testing. + */ + public Builder withNimbusDaemon() { + return withNimbusDaemon(true); + } + + /** + * If nimbusDaemon is true the local nimbus will launch a thrift server. This is intended to be used mostly for internal storm + * testing. + */ + public Builder withNimbusDaemon(Boolean nimbusDaemon) { + if (nimbusDaemon == null) { + nimbusDaemon = false; + LOG.warn("nimbusDaemon is null"); + } + this.nimbusDaemon = nimbusDaemon; + return this; + } + + /** + * Turn on simulated time in the cluster. This allows someone to simulate long periods of time for timeouts etc when testing + * nimbus/supervisors themselves. NOTE: that this only works for code that uses the {@link org.apache.storm.utils.Time} class for + * time management so it will not work in all cases. + */ + public Builder withSimulatedTime() { + return withSimulatedTime(true); + } + + /** + * Turn on simulated time in the cluster. This allows someone to simulate long periods of time for timeouts etc when testing + * nimbus/supervisors themselves. NOTE: that this only works for code that uses the {@link org.apache.storm.utils.Time} class for + * time management so it will not work in all cases. + */ + public Builder withSimulatedTime(boolean simulateTime) { + this.simulateTime = simulateTime; + return this; + } + + /** + * Before nimbus is created/used call nimbusWrapper on it first and use the result instead. This is intended for internal testing + * only, and it here to allow a mocking framework to spy on the nimbus class. + */ + public Builder withNimbusWrapper(UnaryOperator<Nimbus> nimbusWrapper) { + this.nimbusWrapper = nimbusWrapper; + return this; + } + + /** + * Use the following blobstore instead of the one in the config. This is intended mostly for internal testing with Mocks. + */ + public Builder withBlobStore(BlobStore store) { + this.store = store; + return this; + } + + /** + * Use the following topo cache instead of creating out own. This is intended mostly for internal testing with Mocks. + */ + public Builder withTopoCache(TopoCache topoCache) { + this.topoCache = topoCache; + return this; + } + + /** + * Use the following clusterState instead of the one in the config. This is intended mostly for internal testing with Mocks. + */ + public Builder withClusterState(IStormClusterState clusterState) { + this.clusterState = clusterState; + return this; + } + + /** + * Use the following leaderElector instead of the one in the config. This is intended mostly for internal testing with Mocks. + */ + public Builder withLeaderElector(ILeaderElector leaderElector) { + this.leaderElector = leaderElector; + return this; + } + + /** + * A tracked cluster can run tracked topologies. See {@link org.apache.storm.testing.TrackedTopology} for more information on + * tracked topologies. + * + * @param trackId an arbitrary unique id that is used to keep track of tracked topologies + */ + public Builder withTracked(String trackId) { + this.trackId = trackId; + return this; + } + + /** + * A tracked cluster can run tracked topologies. See {@link org.apache.storm.testing.TrackedTopology} for more information on + * tracked topologies. + */ + public Builder withTracked() { + this.trackId = Utils.uuid(); + return this; + } + + /** + * Builds a new LocalCluster. + * + * @return the LocalCluster + * + * @throws Exception on any one of many different errors. This is intended for testing so yes it is ugly and throws Exception... + */ + public LocalCluster build() throws Exception { + return new LocalCluster(this); + } + } + + private static class TrackedStormCommon extends StormCommon { + + private final String id; + + public TrackedStormCommon(String id) { + this.id = id; + } + + @Override + public IBolt makeAckerBoltImpl() { + return new NonRichBoltTracker(new Acker(), id); + } + } + + /** + * When running a topology locally, for tests etc. It is helpful to be sure that the topology is dead before the test exits. This is + * an AutoCloseable topology that not only gives you access to the compiled StormTopology but also will kill the topology when it + * closes. + * + * try (LocalTopology testTopo = cluster.submitTopology("testing", ...)) { // Run Some test } // The topology has been killed + */ + public class LocalTopology extends StormTopology implements ILocalTopology { + private static final long serialVersionUID = 6145919776650637748L; + private final String topoName; + + public LocalTopology(String topoName, StormTopology topo) { + super(topo); + this.topoName = topoName; + } + + @Override + public void close() throws TException { + killTopologyWithOpts(topoName, KILL_NOW); } - - withLocalModeOverride(() -> { - String klass = args[0]; - String [] newArgs = Arrays.copyOfRange(args, 1, args.length); - Class<?> c = Class.forName(klass); - Method main = c.getDeclaredMethod("main", String[].class); - - LOG.info("\n\n\t\tRUNNING {} with args {}\n\n", main, Arrays.toString(newArgs)); - main.invoke(null, (Object)newArgs); - return (Void)null; - }, ttl); - - //Sometimes external things used with testing don't shut down all the way - System.exit(0); } }
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/LocalDRPC.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/LocalDRPC.java b/storm-server/src/main/java/org/apache/storm/LocalDRPC.java index bd60b62..ef257ed 100644 --- a/storm-server/src/main/java/org/apache/storm/LocalDRPC.java +++ b/storm-server/src/main/java/org/apache/storm/LocalDRPC.java @@ -15,10 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm; import java.util.Map; - import org.apache.storm.daemon.drpc.DRPC; import org.apache.storm.daemon.drpc.DRPCThrift; import org.apache.storm.generated.AuthorizationException; @@ -30,10 +30,8 @@ import org.apache.thrift.TException; /** * A Local way to test DRPC - * - * try (LocalDRPC drpc = new LocalDRPC()) { - * // Do tests - * } + * + * try (LocalDRPC drpc = new LocalDRPC()) { // Do tests } */ public class LocalDRPC implements ILocalDRPC { @@ -65,7 +63,7 @@ public class LocalDRPC implements ILocalDRPC { public void failRequest(String id) throws AuthorizationException, TException { drpc.failRequest(id, null); } - + @Override public void failRequestV2(String id, DRPCExecutionException e) throws AuthorizationException, TException { @@ -82,7 +80,7 @@ public class LocalDRPC implements ILocalDRPC { ServiceRegistry.unregisterService(this.serviceId); drpc.close(); } - + @Override public void shutdown() { close(); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/ProcessSimulator.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/ProcessSimulator.java b/storm-server/src/main/java/org/apache/storm/ProcessSimulator.java index 1388e07..f2e81cf 100644 --- a/storm-server/src/main/java/org/apache/storm/ProcessSimulator.java +++ b/storm-server/src/main/java/org/apache/storm/ProcessSimulator.java @@ -1,29 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm; -import org.apache.storm.daemon.Shutdownable; -import org.apache.storm.utils.Utils; import java.nio.channels.ClosedByInterruptException; import java.util.Collection; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; - +import org.apache.storm.daemon.Shutdownable; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,13 +26,13 @@ import org.slf4j.LoggerFactory; * in place of actual processes (in cluster mode). */ public class ProcessSimulator { + protected static ConcurrentHashMap<String, Shutdownable> processMap = new ConcurrentHashMap<String, Shutdownable>(); private static Logger LOG = LoggerFactory.getLogger(ProcessSimulator.class); private static Object lock = new Object(); - protected static ConcurrentHashMap<String, Shutdownable> processMap = new ConcurrentHashMap<String, Shutdownable>(); /** * Register a process' handle - * + * * @param pid * @param shutdownable */ @@ -48,7 +42,7 @@ public class ProcessSimulator { /** * Get all process handles - * + * * @return */ public static Collection<Shutdownable> getAllProcessHandles() { @@ -57,7 +51,7 @@ public class ProcessSimulator { /** * Kill a process - * + * * @param pid */ public static void killProcess(String pid) { @@ -85,7 +79,7 @@ public class ProcessSimulator { LOG.warn("process {} not killed (Ignoring InterruptedException)", pid, e); } else if (Utils.exceptionCauseIsInstanceOf(ClosedByInterruptException.class, e)) { LOG.warn("process {} not killed (Ignoring ClosedByInterruptException)", pid, e); - } else if (e instanceof RuntimeException){ + } else if (e instanceof RuntimeException) { throw e; } else { //TODO once everything is in java this should not be possible any more http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/Testing.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/Testing.java b/storm-server/src/main/java/org/apache/storm/Testing.java index c391926..5a8495b 100644 --- a/storm-server/src/main/java/org/apache/storm/Testing.java +++ b/storm-server/src/main/java/org/apache/storm/Testing.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm; @@ -29,7 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.stream.Collectors; - import org.apache.storm.cluster.IStormClusterState; import org.apache.storm.generated.Bolt; import org.apache.storm.generated.GlobalStreamId; @@ -66,13 +59,14 @@ import org.slf4j.LoggerFactory; * A utility that helps with testing topologies, Bolts and Spouts. */ public class Testing { - private static final Logger LOG = LoggerFactory.getLogger(Testing.class); /** - * The default amount of wall time should be spent waiting for + * The default amount of wall time should be spent waiting for * specific conditions to happen. Default is 10 seconds unless * the environment variable STORM_TEST_TIMEOUT_MS is set. */ public static final int TEST_TIMEOUT_MS; + private static final Logger LOG = LoggerFactory.getLogger(Testing.class); + static { int timeout = 10_000; try { @@ -82,14 +76,7 @@ public class Testing { } TEST_TIMEOUT_MS = timeout; } - - /** - * Simply produces a boolean to see if a specific state is true or false. - */ - public static interface Condition { - public boolean exec(); - } - + /** * Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has * passed @@ -100,7 +87,7 @@ public class Testing { public static void whileTimeout(Condition condition, Runnable body) { whileTimeout(TEST_TIMEOUT_MS, condition, body); } - + /** * Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has * passed @@ -124,17 +111,17 @@ public class Testing { } LOG.debug("Condition met {}", condition); } - + /** * Convenience method for data.stream.allMatch(pred) */ public static <T> boolean isEvery(Collection<T> data, Predicate<T> pred) { return data.stream().allMatch(pred); } - + /** * Run with simulated time - * @deprecated use ``` + * @deprecated use ``` * try (Time.SimulatedTime time = new Time.SimulatedTime()) { * ... * } @@ -147,15 +134,15 @@ public class Testing { code.run(); } } - + private static LocalCluster cluster(MkClusterParam param, boolean simulated) throws Exception { return cluster(param, null, simulated); } - + private static LocalCluster cluster(MkClusterParam param) throws Exception { return cluster(param, null, false); } - + private static LocalCluster cluster(MkClusterParam param, String id, boolean simulated) throws Exception { Integer supervisors = param.getSupervisors(); if (supervisors == null) { @@ -170,18 +157,18 @@ public class Testing { conf = new HashMap<>(); } return new LocalCluster.Builder() - .withSupervisors(supervisors) - .withPortsPerSupervisor(ports) - .withDaemonConf(conf) - .withNimbusDaemon(param.isNimbusDaemon()) - .withTracked(id) - .withSimulatedTime(simulated) - .build(); + .withSupervisors(supervisors) + .withPortsPerSupervisor(ports) + .withDaemonConf(conf) + .withNimbusDaemon(param.isNimbusDaemon()) + .withTracked(id) + .withSimulatedTime(simulated) + .build(); } - + /** * Run with a local cluster - * @deprecated use ``` + * @deprecated use ``` * try (LocalCluster cluster = new LocalCluster()) { * ... * } @@ -192,10 +179,10 @@ public class Testing { public static void withLocalCluster(TestJob code) { withLocalCluster(new MkClusterParam(), code); } - + /** * Run with a local cluster - * @deprecated use ``` + * @deprecated use ``` * try (LocalCluster cluster = new LocalCluster.Builder()....build()) { * ... * } @@ -211,10 +198,10 @@ public class Testing { throw new RuntimeException(e); } } - + /** * Run with a local cluster - * @deprecated use ``` + * @deprecated use ``` * try (LocalCluster cluster = new LocalCluster.Builder()....build()) { * ... * } @@ -235,21 +222,21 @@ public class Testing { Boolean nimbusDaemon = (Boolean) clusterConf.getOrDefault("nimbus-daemon", false); try { return new LocalCluster.Builder() - .withSupervisors(supervisors.intValue()) - .withDaemonConf(conf) - .withPortsPerSupervisor(ports.intValue()) - .withINimbus(inimbus) - .withSupervisorSlotPortMin(portMin) - .withNimbusDaemon(nimbusDaemon) - .build(); + .withSupervisors(supervisors.intValue()) + .withDaemonConf(conf) + .withPortsPerSupervisor(ports.intValue()) + .withINimbus(inimbus) + .withSupervisorSlotPortMin(portMin) + .withNimbusDaemon(nimbusDaemon) + .build(); } catch (Exception e) { throw new RuntimeException(e); } } - + /** * Run with a local cluster - * @deprecated use ``` + * @deprecated use ``` * try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()) { * ... * } @@ -260,10 +247,10 @@ public class Testing { public static void withSimulatedTimeLocalCluster(TestJob code) { withSimulatedTimeLocalCluster(new MkClusterParam(), code); } - + /** * Run with a local cluster - * @deprecated use ``` + * @deprecated use ``` * try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()....build()) { * ... * } @@ -279,10 +266,10 @@ public class Testing { throw new RuntimeException(e); } } - + /** * Run with a local cluster - * @deprecated use ``` + * @deprecated use ``` * try (LocalCluster cluster = new LocalCluster.Builder().withTracked().build()) { * ... * } @@ -293,7 +280,7 @@ public class Testing { public static void withTrackedCluster(TestJob code) { withTrackedCluster(new MkClusterParam(), code); } - + /** * In a tracked topology some metrics are tracked. This provides a way to get those metrics. * This is intended mostly for internal testing. @@ -305,12 +292,12 @@ public class Testing { @Deprecated public static int globalAmt(String id, String key) { LOG.warn("Reading tracked metrics for ID {}", id); - return ((ConcurrentHashMap<String, AtomicInteger>)RegisteredGlobalState.getState(id)).get(key).get(); + return ((ConcurrentHashMap<String, AtomicInteger>) RegisteredGlobalState.getState(id)).get(key).get(); } - + /** * Run with a local tracked cluster - * @deprecated use ``` + * @deprecated use ``` * try (LocalCluster cluster = new LocalCluster.Builder().withTracked()....build()) { * ... * } @@ -326,25 +313,7 @@ public class Testing { throw new RuntimeException(e); } } - - /** - * A topology that has all messages captured and can be read later on. - * This is intended mostly for internal testing. - * @param <T> the topology (tracked or regular) - */ - public static final class CapturedTopology<T> { - public final T topology; - /** - * a Bolt that will hold all of the captured data. - */ - public final TupleCaptureBolt capturer; - - public CapturedTopology(T topology, TupleCaptureBolt capturer) { - this.topology = topology; - this.capturer = capturer; - } - } - + /** * Track and capture a topology. * This is intended mostly for internal testing. @@ -353,7 +322,7 @@ public class Testing { CapturedTopology<StormTopology> captured = captureTopology(topology); return new CapturedTopology<>(new TrackedTopology(captured.topology, cluster), captured.capturer); } - + /** * Rewrites a topology so that all the tuples flowing through it are captured * @param topology the topology to rewrite @@ -362,7 +331,7 @@ public class Testing { */ public static CapturedTopology<StormTopology> captureTopology(StormTopology topology) { topology = topology.deepCopy(); //Don't modify the original - + TupleCaptureBolt capturer = new TupleCaptureBolt(); Map<GlobalStreamId, Grouping> captureBoltInputs = new HashMap<>(); for (Map.Entry<String, SpoutSpec> spoutEntry : topology.get_spouts().entrySet()) { @@ -377,7 +346,7 @@ public class Testing { } } } - + for (Entry<String, Bolt> boltEntry : topology.get_bolts().entrySet()) { String id = boltEntry.getKey(); for (Entry<String, StreamInfo> streamEntry : boltEntry.getValue().get_common().get_streams().entrySet()) { @@ -391,35 +360,36 @@ public class Testing { } } topology.put_to_bolts(Utils.uuid(), new Bolt(Thrift.serializeComponentObject(capturer), - Thrift.prepareComponentCommon(captureBoltInputs, new HashMap<>(), null))); + Thrift.prepareComponentCommon(captureBoltInputs, new HashMap<>(), null))); return new CapturedTopology<>(topology, capturer); } - + /** * Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are * instances of {@link org.apache.storm.testing.CompletableSpout} * @param cluster the cluster to submit the topology to * @param topology the topology itself - * @return a map of the component to the list of tuples it emitted. + * @return a map of the component to the list of tuples it emitted. * @throws InterruptedException * @throws TException on any error from nimbus. */ - public static Map<String, List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology) throws InterruptedException, TException { + public static Map<String, List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology) throws InterruptedException, + TException { return completeTopology(cluster, topology, new CompleteTopologyParam()); } - + /** * Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are * instances of {@link org.apache.storm.testing.CompletableSpout} or are overwritten by MockedSources in param * @param cluster the cluster to submit the topology to * @param topology the topology itself * @param param parameters to describe how to complete a topology. - * @return a map of the component to the list of tuples it emitted. + * @return a map of the component to the list of tuples it emitted. * @throws InterruptedException * @throws TException on any error from nimbus. */ public static Map<String, List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology, - CompleteTopologyParam param) throws TException, InterruptedException { + CompleteTopologyParam param) throws TException, InterruptedException { Map<String, List<FixedTuple>> ret = null; IStormClusterState state = cluster.getClusterState(); CapturedTopology<StormTopology> capTopo = captureTopology(topology); @@ -432,30 +402,31 @@ public class Testing { Map<String, SpoutSpec> spouts = topology.get_spouts(); MockedSources ms = param.getMockedSources(); if (ms != null) { - for (Entry<String, List<FixedTuple>> mocked: ms.getData().entrySet()) { + for (Entry<String, List<FixedTuple>> mocked : ms.getData().entrySet()) { FixedTupleSpout newSpout = new FixedTupleSpout(mocked.getValue()); spouts.get(mocked.getKey()).set_spout_object(Thrift.serializeComponentObject(newSpout)); } } List<Object> spoutObjects = spouts.values().stream(). - map((spec) -> Thrift.deserializeComponentObject(spec.get_spout_object())).collect(Collectors.toList()); - - for (Object o: spoutObjects) { + map((spec) -> Thrift.deserializeComponentObject(spec.get_spout_object())).collect(Collectors.toList()); + + for (Object o : spoutObjects) { if (!(o instanceof CompletableSpout)) { - throw new RuntimeException("Cannot complete topology unless every spout is a CompletableSpout (or mocked to be); failed by " + o); + throw new RuntimeException( + "Cannot complete topology unless every spout is a CompletableSpout (or mocked to be); failed by " + o); } } - - for (Object spout: spoutObjects) { - ((CompletableSpout)spout).startup(); + + for (Object spout : spoutObjects) { + ((CompletableSpout) spout).startup(); } - + cluster.submitTopology(topoName, param.getStormConf(), topology); - + if (Time.isSimulating()) { cluster.advanceClusterTime(11); } - + String topoId = state.getTopoId(topoName).get(); //Give the topology time to come up without using it to wait for the spouts to complete simulateWait(cluster); @@ -464,41 +435,41 @@ public class Testing { timeoutMs = TEST_TIMEOUT_MS; } whileTimeout(timeoutMs, - () -> !isEvery(spoutObjects, (o) -> ((CompletableSpout)o).isExhausted()), - () -> { - try { - simulateWait(cluster); - } catch (Exception e) { - throw new RuntimeException(); - } - }); + () -> !isEvery(spoutObjects, (o) -> ((CompletableSpout) o).isExhausted()), + () -> { + try { + simulateWait(cluster); + } catch (Exception e) { + throw new RuntimeException(); + } + }); KillOptions killOpts = new KillOptions(); killOpts.set_wait_secs(0); cluster.killTopologyWithOpts(topoName, killOpts); - + whileTimeout(timeoutMs, - () -> state.assignmentInfo(topoId, null) != null, - () -> { - try { - simulateWait(cluster); - } catch (Exception e) { - throw new RuntimeException(); - } - }); - + () -> state.assignmentInfo(topoId, null) != null, + () -> { + try { + simulateWait(cluster); + } catch (Exception e) { + throw new RuntimeException(); + } + }); + if (param.getCleanupState()) { for (Object o : spoutObjects) { - ((CompletableSpout)o).clean(); + ((CompletableSpout) o).clean(); } ret = capTopo.capturer.getAndRemoveResults(); } else { ret = capTopo.capturer.getAndClearResults(); } - + return ret; } - + /** * If using simulated time simulate waiting for 10 seconds. This is intended for internal testing only. */ @@ -508,7 +479,7 @@ public class Testing { Thread.sleep(100); } } - + /** * Get all of the tuples from a given component on the default stream * @param results the results of running a completed topology @@ -530,7 +501,7 @@ public class Testing { List<List<Object>> ret = new ArrayList<>(); List<FixedTuple> streamResult = results.get(componentId); if (streamResult != null) { - for (FixedTuple tuple: streamResult) { + for (FixedTuple tuple : streamResult) { if (streamId.equals(tuple.stream)) { ret.add(tuple.values); } @@ -538,7 +509,7 @@ public class Testing { } return ret; } - + /** * Create a tracked topology. * @deprecated use {@link org.apache.storm.testing.TrackedTopology} directly. @@ -547,63 +518,63 @@ public class Testing { public static TrackedTopology mkTrackedTopology(ILocalCluster cluster, StormTopology topology) { return new TrackedTopology(topology, cluster); } - + /** * Simulated time wait for a tracked topology. This is intended for internal testing */ public static void trackedWait(CapturedTopology<TrackedTopology> topo) { topo.topology.trackedWait(); } - + /** * Simulated time wait for a tracked topology. This is intended for internal testing */ public static void trackedWait(CapturedTopology<TrackedTopology> topo, Integer amt) { topo.topology.trackedWait(amt); } - + /** * Simulated time wait for a tracked topology. This is intended for internal testing */ public static void trackedWait(CapturedTopology<TrackedTopology> topo, Integer amt, Integer timeoutMs) { topo.topology.trackedWait(amt, timeoutMs); } - + /** * Simulated time wait for a tracked topology. This is intended for internal testing */ public static void trackedWait(TrackedTopology topo) { topo.trackedWait(); } - + /** * Simulated time wait for a tracked topology. This is intended for internal testing */ public static void trackedWait(TrackedTopology topo, Integer amt) { topo.trackedWait(amt); } - + /** * Simulated time wait for a tracked topology. This is intended for internal testing */ public static void trackedWait(TrackedTopology topo, Integer amt, Integer timeoutMs) { topo.trackedWait(amt, timeoutMs); } - + /** * Simulated time wait for a cluster. This is intended for internal testing */ public static void advanceClusterTime(ILocalCluster cluster, Integer secs) throws InterruptedException { advanceClusterTime(cluster, secs, 1); } - + /** * Simulated time wait for a cluster. This is intended for internal testing */ public static void advanceClusterTime(ILocalCluster cluster, Integer secs, Integer step) throws InterruptedException { cluster.advanceClusterTime(secs, step); } - + /** * Count how many times each element appears in the Collection * @param c a collection of values @@ -611,7 +582,7 @@ public class Testing { */ public static <T> Map<T, Integer> multiset(Collection<T> c) { Map<T, Integer> ret = new HashMap<T, Integer>(); - for (T t: c) { + for (T t : c) { Integer i = ret.get(t); if (i == null) { i = new Integer(0); @@ -621,40 +592,40 @@ public class Testing { } return ret; } - + private static void printRec(Object o, String prefix) { if (o instanceof Collection) { - LOG.info("{} {} ({}) [",prefix,o, o.getClass()); - for (Object sub: (Collection)o) { + LOG.info("{} {} ({}) [", prefix, o, o.getClass()); + for (Object sub : (Collection) o) { printRec(sub, prefix + " "); } - LOG.info("{} ]",prefix); + LOG.info("{} ]", prefix); } else if (o instanceof Map) { - Map<?,?> m = (Map<?,?>)o; - LOG.info("{} {} ({}) {",prefix,o, o.getClass()); - for (Map.Entry<?, ?> entry: m.entrySet()) { + Map<?, ?> m = (Map<?, ?>) o; + LOG.info("{} {} ({}) {", prefix, o, o.getClass()); + for (Map.Entry<?, ?> entry : m.entrySet()) { printRec(entry.getKey(), prefix + " "); LOG.info("{} ->", prefix); printRec(entry.getValue(), prefix + " "); } - LOG.info("{} }",prefix); + LOG.info("{} }", prefix); } else { LOG.info("{} {} ({})", prefix, o, o.getClass()); } } - + /** * Check if two collections are equivalent ignoring the order of elements. */ public static <T> boolean multiseteq(Collection<T> a, Collection<T> b) { boolean ret = multiset(a).equals(multiset(b)); - if (! ret) { + if (!ret) { printRec(multiset(a), "MS-A:"); printRec(multiset(b), "MS-B:"); } return ret; } - + /** * Create a {@link org.apache.storm.tuple.Tuple} for use with testing * @param values the values to appear in the tuple @@ -662,7 +633,7 @@ public class Testing { public static Tuple testTuple(List<Object> values) { return testTuple(values, new MkTupleParam()); } - + /** * Create a {@link org.apache.storm.tuple.Tuple} for use with testing * @param values the values to appear in the tuple @@ -673,19 +644,19 @@ public class Testing { if (stream == null) { stream = Utils.DEFAULT_STREAM_ID; } - + String component = param.getComponent(); if (component == null) { component = "component"; } - + int task = 1; - + List<String> fields = param.getFields(); if (fields == null) { fields = new ArrayList<>(values.size()); for (int i = 1; i <= values.size(); i++) { - fields.add("field"+i); + fields.add("field" + i); } } @@ -696,23 +667,48 @@ public class Testing { streamToFields.put(stream, new Fields(fields)); compToStreamToFields.put(component, streamToFields); - TopologyContext context= new TopologyContext(null, - ConfigUtils.readStormConfig(), - taskToComp, - null, - compToStreamToFields, - null, - "test-storm-id", - null, - null, - 1, - null, - null, - new HashMap<>(), - new HashMap<>(), - new HashMap<>(), - new HashMap<>(), - new AtomicBoolean(false)); + TopologyContext context = new TopologyContext(null, + ConfigUtils.readStormConfig(), + taskToComp, + null, + compToStreamToFields, + null, + "test-storm-id", + null, + null, + 1, + null, + null, + new HashMap<>(), + new HashMap<>(), + new HashMap<>(), + new HashMap<>(), + new AtomicBoolean(false)); return new TupleImpl(context, values, component, 1, stream); } + + /** + * Simply produces a boolean to see if a specific state is true or false. + */ + public static interface Condition { + public boolean exec(); + } + + /** + * A topology that has all messages captured and can be read later on. + * This is intended mostly for internal testing. + * @param <T> the topology (tracked or regular) + */ + public static final class CapturedTopology<T> { + public final T topology; + /** + * a Bolt that will hold all of the captured data. + */ + public final TupleCaptureBolt capturer; + + public CapturedTopology(T topology, TupleCaptureBolt capturer) { + this.topology = topology; + this.capturer = capturer; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/blobstore/BlobKeySequenceInfo.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/BlobKeySequenceInfo.java b/storm-server/src/main/java/org/apache/storm/blobstore/BlobKeySequenceInfo.java index 8a01f1e..e50575a 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobKeySequenceInfo.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobKeySequenceInfo.java @@ -1,40 +1,35 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.blobstore; public class BlobKeySequenceInfo { private String nimbusHostPort; private String sequenceNumber; - public void setNimbusHostPort(String nimbusHostPort) { - this.nimbusHostPort = nimbusHostPort; - } - - public void setSequenceNumber(String sequenceNumber) { - this.sequenceNumber = sequenceNumber; - } - public String getNimbusHostPort() { return nimbusHostPort; } + public void setNimbusHostPort(String nimbusHostPort) { + this.nimbusHostPort = nimbusHostPort; + } + public String getSequenceNumber() { return sequenceNumber; } + public void setSequenceNumber(String sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + }
