http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java index 4709b5e..8d00397 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java @@ -15,11 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.samples; import java.io.FileReader; import java.util.Properties; - import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.eventhubs.samples.bolt.GlobalCountBolt; @@ -30,119 +30,115 @@ import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder; /** - * The basic scenario topology that uses EventHubSpout with PartialCountBolt - * and GlobalCountBolt. - * To submit this topology: - * storm jar {jarfile} {classname} {topologyname} {spoutconffile} + * The basic scenario topology that uses EventHubSpout with PartialCountBolt and GlobalCountBolt. To submit this topology: storm jar + * {jarfile} {classname} {topologyname} {spoutconffile} */ public class EventCount { - protected EventHubSpoutConfig spoutConfig; - protected int numWorkers; - - public EventCount() { - } - - protected void readEHConfig(String[] args) throws Exception { - Properties properties = new Properties(); - if(args.length > 1) { - properties.load(new FileReader(args[1])); + protected EventHubSpoutConfig spoutConfig; + protected int numWorkers; + + public EventCount() { } - else { - properties.load(EventCount.class.getClassLoader().getResourceAsStream( - "Config.properties")); + + public static void main(String[] args) throws Exception { + EventCount scenario = new EventCount(); + scenario.runScenario(args); } - String username = properties.getProperty("eventhubspout.username"); - String password = properties.getProperty("eventhubspout.password"); - String namespaceName = properties.getProperty("eventhubspout.namespace"); - String entityPath = properties.getProperty("eventhubspout.entitypath"); - String targetFqnAddress = properties.getProperty("eventhubspout.targetfqnaddress"); - String zkEndpointAddress = properties.getProperty("zookeeper.connectionstring"); - int partitionCount = Integer.parseInt(properties.getProperty("eventhubspout.partitions.count")); - int checkpointIntervalInSeconds = Integer.parseInt(properties.getProperty("eventhubspout.checkpoint.interval")); - int receiverCredits = Integer.parseInt(properties.getProperty("eventhub.receiver.credits")); - String maxPendingMsgsPerPartitionStr = properties.getProperty("eventhubspout.max.pending.messages.per.partition"); - if(maxPendingMsgsPerPartitionStr == null) { - maxPendingMsgsPerPartitionStr = "1024"; + protected void readEHConfig(String[] args) throws Exception { + Properties properties = new Properties(); + if (args.length > 1) { + properties.load(new FileReader(args[1])); + } else { + properties.load(EventCount.class.getClassLoader().getResourceAsStream( + "Config.properties")); + } + + String username = properties.getProperty("eventhubspout.username"); + String password = properties.getProperty("eventhubspout.password"); + String namespaceName = properties.getProperty("eventhubspout.namespace"); + String entityPath = properties.getProperty("eventhubspout.entitypath"); + String targetFqnAddress = properties.getProperty("eventhubspout.targetfqnaddress"); + String zkEndpointAddress = properties.getProperty("zookeeper.connectionstring"); + int partitionCount = Integer.parseInt(properties.getProperty("eventhubspout.partitions.count")); + int checkpointIntervalInSeconds = Integer.parseInt(properties.getProperty("eventhubspout.checkpoint.interval")); + int receiverCredits = Integer.parseInt(properties.getProperty("eventhub.receiver.credits")); + String maxPendingMsgsPerPartitionStr = properties.getProperty("eventhubspout.max.pending.messages.per.partition"); + if (maxPendingMsgsPerPartitionStr == null) { + maxPendingMsgsPerPartitionStr = "1024"; + } + int maxPendingMsgsPerPartition = Integer.parseInt(maxPendingMsgsPerPartitionStr); + String enqueueTimeDiffStr = properties.getProperty("eventhub.receiver.filter.timediff"); + if (enqueueTimeDiffStr == null) { + enqueueTimeDiffStr = "0"; + } + int enqueueTimeDiff = Integer.parseInt(enqueueTimeDiffStr); + long enqueueTimeFilter = 0; + if (enqueueTimeDiff != 0) { + enqueueTimeFilter = System.currentTimeMillis() - enqueueTimeDiff * 1000; + } + String consumerGroupName = properties.getProperty("eventhubspout.consumer.group.name"); + + System.out.println("Eventhub spout config: "); + System.out.println(" partition count: " + partitionCount); + System.out.println(" checkpoint interval: " + checkpointIntervalInSeconds); + System.out.println(" receiver credits: " + receiverCredits); + spoutConfig = new EventHubSpoutConfig(username, password, + namespaceName, entityPath, partitionCount, zkEndpointAddress, + checkpointIntervalInSeconds, receiverCredits, maxPendingMsgsPerPartition, + enqueueTimeFilter); + + if (targetFqnAddress != null) { + spoutConfig.setTargetAddress(targetFqnAddress); + } + spoutConfig.setConsumerGroupName(consumerGroupName); + + //set the number of workers to be the same as partition number. + //the idea is to have a spout and a partial count bolt co-exist in one + //worker to avoid shuffling messages across workers in storm cluster. + numWorkers = spoutConfig.getPartitionCount(); + + if (args.length > 0) { + //set topology name so that sample Trident topology can use it as stream name. + spoutConfig.setTopologyName(args[0]); + } } - int maxPendingMsgsPerPartition = Integer.parseInt(maxPendingMsgsPerPartitionStr); - String enqueueTimeDiffStr = properties.getProperty("eventhub.receiver.filter.timediff"); - if(enqueueTimeDiffStr == null) { - enqueueTimeDiffStr = "0"; + + protected EventHubSpout createEventHubSpout() { + EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig); + return eventHubSpout; } - int enqueueTimeDiff = Integer.parseInt(enqueueTimeDiffStr); - long enqueueTimeFilter = 0; - if(enqueueTimeDiff != 0) { - enqueueTimeFilter = System.currentTimeMillis() - enqueueTimeDiff*1000; + + protected StormTopology buildTopology(EventHubSpout eventHubSpout) { + TopologyBuilder topologyBuilder = new TopologyBuilder(); + + topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount()) + .setNumTasks(spoutConfig.getPartitionCount()); + topologyBuilder.setBolt("PartialCountBolt", new PartialCountBolt(), spoutConfig.getPartitionCount()) + .localOrShuffleGrouping("EventHubsSpout").setNumTasks(spoutConfig.getPartitionCount()); + topologyBuilder.setBolt("GlobalCountBolt", new GlobalCountBolt(), 1) + .globalGrouping("PartialCountBolt").setNumTasks(1); + return topologyBuilder.createTopology(); } - String consumerGroupName = properties.getProperty("eventhubspout.consumer.group.name"); - - System.out.println("Eventhub spout config: "); - System.out.println(" partition count: " + partitionCount); - System.out.println(" checkpoint interval: " + checkpointIntervalInSeconds); - System.out.println(" receiver credits: " + receiverCredits); - spoutConfig = new EventHubSpoutConfig(username, password, - namespaceName, entityPath, partitionCount, zkEndpointAddress, - checkpointIntervalInSeconds, receiverCredits, maxPendingMsgsPerPartition, - enqueueTimeFilter); - - if(targetFqnAddress != null) - { - spoutConfig.setTargetAddress(targetFqnAddress); + + protected void submitTopology(String[] args, StormTopology topology) throws Exception { + Config config = new Config(); + config.setDebug(false); + //Enable metrics + config.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1); + + String topoName = "test"; + if (args.length > 0) { + topoName = args[0]; + } + config.setNumWorkers(numWorkers); + StormSubmitter.submitTopology(topoName, config, topology); } - spoutConfig.setConsumerGroupName(consumerGroupName); - - //set the number of workers to be the same as partition number. - //the idea is to have a spout and a partial count bolt co-exist in one - //worker to avoid shuffling messages across workers in storm cluster. - numWorkers = spoutConfig.getPartitionCount(); - - if(args.length > 0) { - //set topology name so that sample Trident topology can use it as stream name. - spoutConfig.setTopologyName(args[0]); + + protected void runScenario(String[] args) throws Exception { + readEHConfig(args); + EventHubSpout eventHubSpout = createEventHubSpout(); + StormTopology topology = buildTopology(eventHubSpout); + submitTopology(args, topology); } - } - - protected EventHubSpout createEventHubSpout() { - EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig); - return eventHubSpout; - } - - protected StormTopology buildTopology(EventHubSpout eventHubSpout) { - TopologyBuilder topologyBuilder = new TopologyBuilder(); - - topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount()) - .setNumTasks(spoutConfig.getPartitionCount()); - topologyBuilder.setBolt("PartialCountBolt", new PartialCountBolt(), spoutConfig.getPartitionCount()) - .localOrShuffleGrouping("EventHubsSpout").setNumTasks(spoutConfig.getPartitionCount()); - topologyBuilder.setBolt("GlobalCountBolt", new GlobalCountBolt(), 1) - .globalGrouping("PartialCountBolt").setNumTasks(1); - return topologyBuilder.createTopology(); - } - - protected void submitTopology(String[] args, StormTopology topology) throws Exception { - Config config = new Config(); - config.setDebug(false); - //Enable metrics - config.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1); - - String topoName = "test"; - if (args.length > 0) { - topoName = args[0]; - } - config.setNumWorkers(numWorkers); - StormSubmitter.submitTopology(topoName, config, topology); - } - - protected void runScenario(String[] args) throws Exception{ - readEHConfig(args); - EventHubSpout eventHubSpout = createEventHubSpout(); - StormTopology topology = buildTopology(eventHubSpout); - submitTopology(args, topology); - } - - public static void main(String[] args) throws Exception { - EventCount scenario = new EventCount(); - scenario.runScenario(args); - } }
http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java index 32dc0d5..55d4cf2 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java @@ -15,38 +15,38 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ -package org.apache.storm.eventhubs.samples; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.topology.TopologyBuilder; +package org.apache.storm.eventhubs.samples; import org.apache.storm.eventhubs.bolt.EventHubBolt; import org.apache.storm.eventhubs.bolt.EventHubBoltConfig; import org.apache.storm.eventhubs.spout.EventHubSpout; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.TopologyBuilder; /** * A sample topology that loops message back to EventHub */ public class EventHubLoop extends EventCount { - @Override - protected StormTopology buildTopology(EventHubSpout eventHubSpout) { - TopologyBuilder topologyBuilder = new TopologyBuilder(); + public static void main(String[] args) throws Exception { + EventHubLoop scenario = new EventHubLoop(); + scenario.runScenario(args); + } + + @Override + protected StormTopology buildTopology(EventHubSpout eventHubSpout) { + TopologyBuilder topologyBuilder = new TopologyBuilder(); + + topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount()) + .setNumTasks(spoutConfig.getPartitionCount()); + EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(), + spoutConfig.getEntityPath(), true); - topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount()) - .setNumTasks(spoutConfig.getPartitionCount()); - EventHubBoltConfig boltConfig = new EventHubBoltConfig(spoutConfig.getConnectionString(), - spoutConfig.getEntityPath(), true); - - EventHubBolt eventHubBolt = new EventHubBolt(boltConfig); - int boltTasks = spoutConfig.getPartitionCount(); - topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks) - .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks); - return topologyBuilder.createTopology(); - } - - public static void main(String[] args) throws Exception { - EventHubLoop scenario = new EventHubLoop(); - scenario.runScenario(args); - } + EventHubBolt eventHubBolt = new EventHubBolt(boltConfig); + int boltTasks = spoutConfig.getPartitionCount(); + topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks) + .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks); + return topologyBuilder.createTopology(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java index a433b8b..abcad5a 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java @@ -15,39 +15,40 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.samples; +import org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter; +import org.apache.storm.eventhubs.spout.EventHubSpout; +import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout; +import org.apache.storm.generated.StormTopology; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.operation.builtin.Count; import org.apache.storm.trident.operation.builtin.Sum; import org.apache.storm.trident.testing.MemoryMapState; -import org.apache.storm.generated.StormTopology; import org.apache.storm.tuple.Fields; -import org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter; -import org.apache.storm.eventhubs.spout.EventHubSpout; -import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout; - /** * A simple Trident topology uses OpaqueTridentEventHubSpout */ public class OpaqueTridentEventCount extends EventCount { - @Override - protected StormTopology buildTopology(EventHubSpout eventHubSpout) { - TridentTopology topology = new TridentTopology(); - - OpaqueTridentEventHubSpout spout = new OpaqueTridentEventHubSpout(spoutConfig); - TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout) - .parallelismHint(spoutConfig.getPartitionCount()) - .aggregate(new Count(), new Fields("partial-count")) - .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count")); - state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000)); - return topology.build(); - } - - public static void main(String[] args) throws Exception { - OpaqueTridentEventCount scenario = new OpaqueTridentEventCount(); - scenario.runScenario(args); - } + public static void main(String[] args) throws Exception { + OpaqueTridentEventCount scenario = new OpaqueTridentEventCount(); + scenario.runScenario(args); + } + + @Override + protected StormTopology buildTopology(EventHubSpout eventHubSpout) { + TridentTopology topology = new TridentTopology(); + + OpaqueTridentEventHubSpout spout = new OpaqueTridentEventHubSpout(spoutConfig); + TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout) + .parallelismHint(spoutConfig.getPartitionCount()) + .aggregate(new Count(), new Fields("partial-count")) + .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), + new Fields("count")); + state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000)); + return topology.build(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java index 718c229..646dc9c 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java @@ -15,17 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ -package org.apache.storm.eventhubs.samples; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.storm.generated.StormTopology; -import org.apache.storm.tuple.Fields; +package org.apache.storm.eventhubs.samples; import org.apache.storm.eventhubs.spout.EventHubSpout; import org.apache.storm.eventhubs.trident.TransactionalTridentEventHubSpout; - +import org.apache.storm.generated.StormTopology; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.operation.BaseFilter; @@ -33,49 +28,54 @@ import org.apache.storm.trident.operation.builtin.Count; import org.apache.storm.trident.operation.builtin.Sum; import org.apache.storm.trident.testing.MemoryMapState; import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A simple Trident topology uses TransactionalTridentEventHubSpout */ public class TransactionalTridentEventCount extends EventCount { - public static class LoggingFilter extends BaseFilter { - private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory.getLogger(LoggingFilter.class); - private final String prefix; - private final long logIntervalMs; - private long lastTime; - public LoggingFilter(String prefix, int logIntervalMs) { - this.prefix = prefix; - this.logIntervalMs = logIntervalMs; - lastTime = System.nanoTime(); + public static void main(String[] args) throws Exception { + TransactionalTridentEventCount scenario = new TransactionalTridentEventCount(); + scenario.runScenario(args); } @Override - public boolean isKeep(TridentTuple tuple) { - long now = System.nanoTime(); - if(logIntervalMs < (now - lastTime) / 1000000) { - logger.info(prefix + tuple.toString()); - lastTime = now; - } - return false; + protected StormTopology buildTopology(EventHubSpout eventHubSpout) { + TridentTopology topology = new TridentTopology(); + + TransactionalTridentEventHubSpout spout = new TransactionalTridentEventHubSpout(spoutConfig); + TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout) + .parallelismHint(spoutConfig.getPartitionCount()) + .aggregate(new Count(), new Fields("partial-count")) + .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), + new Fields("count")); + state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000)); + return topology.build(); + } + + public static class LoggingFilter extends BaseFilter { + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(LoggingFilter.class); + private final String prefix; + private final long logIntervalMs; + private long lastTime; + + public LoggingFilter(String prefix, int logIntervalMs) { + this.prefix = prefix; + this.logIntervalMs = logIntervalMs; + lastTime = System.nanoTime(); + } + + @Override + public boolean isKeep(TridentTuple tuple) { + long now = System.nanoTime(); + if (logIntervalMs < (now - lastTime) / 1000000) { + logger.info(prefix + tuple.toString()); + lastTime = now; + } + return false; + } } - } - - @Override - protected StormTopology buildTopology(EventHubSpout eventHubSpout) { - TridentTopology topology = new TridentTopology(); - - TransactionalTridentEventHubSpout spout = new TransactionalTridentEventHubSpout(spoutConfig); - TridentState state = topology.newStream("stream-" + spoutConfig.getTopologyName(), spout) - .parallelismHint(spoutConfig.getPartitionCount()) - .aggregate(new Count(), new Fields("partial-count")) - .persistentAggregate(new MemoryMapState.Factory(), new Fields("partial-count"), new Sum(), new Fields("count")); - state.newValuesStream().each(new Fields("count"), new LoggingFilter("got count: ", 10000)); - return topology.build(); - } - - public static void main(String[] args) throws Exception { - TransactionalTridentEventCount scenario = new TransactionalTridentEventCount(); - scenario.runScenario(args); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java index 9d94f5f..cf3fb76 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java @@ -15,15 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.samples.bolt; import java.util.HashMap; import java.util.Map; - -import org.apache.storm.utils.TupleUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.storm.Config; import org.apache.storm.metric.api.IMetric; import org.apache.storm.task.TopologyContext; @@ -31,58 +27,61 @@ import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Globally count number of messages */ public class GlobalCountBolt extends BaseBasicBolt { - private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory - .getLogger(GlobalCountBolt.class); - private long globalCount; - private long globalCountDiff; - private long lastMetricsTime; - private long throughput; - - @Override - public void prepare(Map<String, Object> config, TopologyContext context) { - globalCount = 0; - globalCountDiff = 0; - lastMetricsTime = System.nanoTime(); - context.registerMetric("GlobalMessageCount", new IMetric() { - @Override - public Object getValueAndReset() { - long now = System.nanoTime(); - long millis = (now - lastMetricsTime) / 1000000; - throughput = globalCountDiff / millis * 1000; - Map<String, Object> values = new HashMap<>(); - values.put("global_count", globalCount); - values.put("throughput", throughput); - lastMetricsTime = now; - globalCountDiff = 0; - return values; - } - }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)); - } + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory + .getLogger(GlobalCountBolt.class); + private long globalCount; + private long globalCountDiff; + private long lastMetricsTime; + private long throughput; - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - if (TupleUtils.isTick(tuple)) { - return; + @Override + public void prepare(Map<String, Object> config, TopologyContext context) { + globalCount = 0; + globalCountDiff = 0; + lastMetricsTime = System.nanoTime(); + context.registerMetric("GlobalMessageCount", new IMetric() { + @Override + public Object getValueAndReset() { + long now = System.nanoTime(); + long millis = (now - lastMetricsTime) / 1000000; + throughput = globalCountDiff / millis * 1000; + Map<String, Object> values = new HashMap<>(); + values.put("global_count", globalCount); + values.put("throughput", throughput); + lastMetricsTime = now; + globalCountDiff = 0; + return values; + } + }, (Integer) config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)); } - int partial = (Integer)tuple.getValueByField("partial_count"); - globalCount += partial; - globalCountDiff += partial; - if((globalCountDiff == partial) && (globalCount != globalCountDiff)) { - //metrics has just been collected, let's also log it - logger.info("Current throughput (messages/second): " + throughput); + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + if (TupleUtils.isTick(tuple)) { + return; + } + + int partial = (Integer) tuple.getValueByField("partial_count"); + globalCount += partial; + globalCountDiff += partial; + if ((globalCountDiff == partial) && (globalCount != globalCountDiff)) { + //metrics has just been collected, let's also log it + logger.info("Current throughput (messages/second): " + throughput); + } } - } - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java index 3763c69..e51dc08 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java @@ -15,14 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.samples.bolt; import java.util.Map; - -import org.apache.storm.utils.TupleUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -30,39 +26,42 @@ import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; +import org.apache.storm.utils.TupleUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Partially count number of messages from EventHubs */ public class PartialCountBolt extends BaseBasicBolt { - private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory - .getLogger(PartialCountBolt.class); - private static final int PartialCountBatchSize = 1000; - - private int partialCount; - - @Override - public void prepare(Map<String, Object> topoConf, TopologyContext context) { - partialCount = 0; - } - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - if (TupleUtils.isTick(tuple)) { - return; + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory + .getLogger(PartialCountBolt.class); + private static final int PartialCountBatchSize = 1000; + + private int partialCount; + + @Override + public void prepare(Map<String, Object> topoConf, TopologyContext context) { + partialCount = 0; } - partialCount++; - if(partialCount == PartialCountBatchSize) { - collector.emit(new Values(PartialCountBatchSize)); - partialCount = 0; + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + if (TupleUtils.isTick(tuple)) { + return; + } + + partialCount++; + if (partialCount == PartialCountBatchSize) { + collector.emit(new Values(PartialCountBatchSize)); + partialCount = 0; + } } - } - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("partial_count")); - } + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("partial_count")); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java index 09fb60f..7f6e697 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java @@ -15,71 +15,70 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import com.microsoft.azure.eventhubs.EventData; - import java.util.Map; /** - * A mock receiver that emits fake data with offset starting from given offset - * and increase by 1 each time. + * A mock receiver that emits fake data with offset starting from given offset and increase by 1 each time. */ public class EventHubReceiverMock implements IEventHubReceiver { - private static boolean isPaused = false; - private final String partitionId; - private long currentOffset; - private boolean isOpen; + private static boolean isPaused = false; + private final String partitionId; + private long currentOffset; + private boolean isOpen; + + public EventHubReceiverMock(String pid) { + partitionId = pid; + isPaused = false; + } - public EventHubReceiverMock(String pid) { - partitionId = pid; - isPaused = false; - } - - /** - * Use this method to pause/resume all the receivers. - * If paused all receiver will return null. - * @param val - */ - public static void setPause(boolean val) { - isPaused = val; - } + /** + * Use this method to pause/resume all the receivers. If paused all receiver will return null. + * + * @param val + */ + public static void setPause(boolean val) { + isPaused = val; + } - @Override - public void open(IEventFilter filter) throws EventHubException { - currentOffset = filter.getOffset() != null ? + @Override + public void open(IEventFilter filter) throws EventHubException { + currentOffset = filter.getOffset() != null ? Long.parseLong(filter.getOffset()) : filter.getTime().toEpochMilli(); - isOpen = true; - } - - @Override - public void close() { - isOpen = false; - } + isOpen = true; + } - @Override - public boolean isOpen() { - return isOpen; - } + @Override + public void close() { + isOpen = false; + } - @Override - public EventDataWrap receive() { - if(isPaused) { - return null; + @Override + public boolean isOpen() { + return isOpen; } - currentOffset++; + @Override + public EventDataWrap receive() { + if (isPaused) { + return null; + } + + currentOffset++; - //the body of the message is "message" + currentOffset, e.g. "message123" + //the body of the message is "message" + currentOffset, e.g. "message123" - MessageId mid = new MessageId(partitionId, "" + currentOffset, currentOffset); - EventData ed = new EventData(("message" + currentOffset).getBytes()); - return EventDataWrap.create(ed,mid); - } - - @Override - public Map<String, Object> getMetricsData() { - return null; - } + MessageId mid = new MessageId(partitionId, "" + currentOffset, currentOffset); + EventData ed = new EventData(("message" + currentOffset).getBytes()); + return EventDataWrap.create(ed, mid); + } + + @Override + public Map<String, Object> getMetricsData() { + return null; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java index 7eb625c..16f3b80 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import org.apache.storm.spout.SpoutOutputCollector; @@ -23,74 +24,69 @@ import org.apache.storm.spout.SpoutOutputCollector; * Mocks EventHubSpout's caller (storm framework) */ public class EventHubSpoutCallerMock { - public static final String statePathPrefix = "/eventhubspout/TestTopo/namespace/entityname/partitions/"; - EventHubSpout spout; - private IStateStore stateStore; - private SpoutOutputCollectorMock collector; - - public EventHubSpoutCallerMock(int totalPartitions, - int totalTasks, int taskIndex, int checkpointInterval) { - stateStore = new StateStoreMock(); - EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "password", - "namespace", "entityname", totalPartitions, "zookeeper", checkpointInterval, 1024); - conf.setTopologyName("TestTopo"); - - IEventHubReceiverFactory recvFactory = new IEventHubReceiverFactory() { - @Override - public IEventHubReceiver create(EventHubSpoutConfig config, - String partitionId) { - return new EventHubReceiverMock(partitionId); - } - }; - //mock state store and receiver - spout = new EventHubSpout(conf, stateStore, null, recvFactory); - - collector = new SpoutOutputCollectorMock(); - - try { - spout.preparePartitions(null, totalTasks, taskIndex, new SpoutOutputCollector(collector)); - } - catch(Exception ex) { - } - } - - /** - * Execute a sequence of calls to EventHubSpout. - * - * @param callSequence: is represented as a string of commands, - * e.g. "r,r,r,r,a1,f2,...". The commands are: - * r[N]: receive() called N times - * aP_X: ack(P_X), partition: P, offset: X - * fP_Y: fail(P_Y), partition: P, offset: Y - */ - public String execute(String callSequence) { - String[] cmds = callSequence.split(","); - for(String cmd : cmds) { - if(cmd.startsWith("r")) { - int count = 1; - if(cmd.length() > 1) { - count = Integer.parseInt(cmd.substring(1)); + public static final String statePathPrefix = "/eventhubspout/TestTopo/namespace/entityname/partitions/"; + EventHubSpout spout; + private IStateStore stateStore; + private SpoutOutputCollectorMock collector; + + public EventHubSpoutCallerMock(int totalPartitions, + int totalTasks, int taskIndex, int checkpointInterval) { + stateStore = new StateStoreMock(); + EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "password", + "namespace", "entityname", totalPartitions, "zookeeper", checkpointInterval, + 1024); + conf.setTopologyName("TestTopo"); + + IEventHubReceiverFactory recvFactory = new IEventHubReceiverFactory() { + @Override + public IEventHubReceiver create(EventHubSpoutConfig config, + String partitionId) { + return new EventHubReceiverMock(partitionId); + } + }; + //mock state store and receiver + spout = new EventHubSpout(conf, stateStore, null, recvFactory); + + collector = new SpoutOutputCollectorMock(); + + try { + spout.preparePartitions(null, totalTasks, taskIndex, new SpoutOutputCollector(collector)); + } catch (Exception ex) { } - for(int i=0; i<count; ++i) { - spout.nextTuple(); + } + + /** + * Execute a sequence of calls to EventHubSpout. + * + * @param callSequence: is represented as a string of commands, e.g. "r,r,r,r,a1,f2,...". The commands are: r[N]: receive() called N + * times aP_X: ack(P_X), partition: P, offset: X fP_Y: fail(P_Y), partition: P, offset: Y + */ + public String execute(String callSequence) { + String[] cmds = callSequence.split(","); + for (String cmd : cmds) { + if (cmd.startsWith("r")) { + int count = 1; + if (cmd.length() > 1) { + count = Integer.parseInt(cmd.substring(1)); + } + for (int i = 0; i < count; ++i) { + spout.nextTuple(); + } + } else if (cmd.startsWith("a")) { + String[] midStrs = cmd.substring(1).split("_"); + MessageId msgId = new MessageId(midStrs[0], midStrs[1], Long.parseLong(midStrs[1])); + spout.ack(msgId); + } else if (cmd.startsWith("f")) { + String[] midStrs = cmd.substring(1).split("_"); + MessageId msgId = new MessageId(midStrs[0], midStrs[1], Long.parseLong(midStrs[1])); + spout.fail(msgId); + } } - } - else if(cmd.startsWith("a")) { - String[] midStrs = cmd.substring(1).split("_"); - MessageId msgId = new MessageId(midStrs[0], midStrs[1], Long.parseLong(midStrs[1])); - spout.ack(msgId); - } - else if(cmd.startsWith("f")) { - String[] midStrs = cmd.substring(1).split("_"); - MessageId msgId = new MessageId(midStrs[0], midStrs[1], Long.parseLong(midStrs[1])); - spout.fail(msgId); - } + return collector.getOffsetSequenceAndReset(); + } + + public String getCheckpoint(int partitionIndex) { + String statePath = statePathPrefix + partitionIndex; + return stateStore.readData(statePath); } - return collector.getOffsetSequenceAndReset(); - } - - public String getCheckpoint(int partitionIndex) { - String statePath = statePathPrefix + partitionIndex; - return stateStore.readData(statePath); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java index 467461c..364990e 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java @@ -15,87 +15,81 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; /** * This mock exercises PartitionManager */ public class PartitionManagerCallerMock { - public static final String statePath = "/eventhubspout/TestTopo/namespace/entityname/partitions/1"; - private IPartitionManager pm; - private IStateStore stateStore; + public static final String statePath = "/eventhubspout/TestTopo/namespace/entityname/partitions/1"; + private IPartitionManager pm; + private IStateStore stateStore; - public PartitionManagerCallerMock(String partitionId) { - this(partitionId, 0); - } - - public PartitionManagerCallerMock(String partitionId, long enqueueTimeFilter) { - EventHubReceiverMock receiver = new EventHubReceiverMock(partitionId); - EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "password", - "namespace", "entityname", 16, "zookeeper", 10, 1024, 1024, enqueueTimeFilter); - conf.setTopologyName("TestTopo"); - stateStore = new StateStoreMock(); - this.pm = new PartitionManager(conf, partitionId, stateStore, receiver); - - stateStore.open(); - try { - pm.open(); + public PartitionManagerCallerMock(String partitionId) { + this(partitionId, 0); } - catch (Exception ex) { + + public PartitionManagerCallerMock(String partitionId, long enqueueTimeFilter) { + EventHubReceiverMock receiver = new EventHubReceiverMock(partitionId); + EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "password", + "namespace", "entityname", 16, "zookeeper", 10, 1024, 1024, enqueueTimeFilter); + conf.setTopologyName("TestTopo"); + stateStore = new StateStoreMock(); + this.pm = new PartitionManager(conf, partitionId, stateStore, receiver); + + stateStore.open(); + try { + pm.open(); + } catch (Exception ex) { + } } - } - - /** - * Execute a sequence of calls to Partition Manager. - * - * @param callSequence: is represented as a string of commands, - * e.g. "r,r,r,r,a1,f2,...". The commands are: - * r[N]: receive() called N times - * aX: ack(X) - * fY: fail(Y) - * - * @return the sequence of messages the receive call returns - */ - public String execute(String callSequence) { - - String[] cmds = callSequence.split(","); - StringBuilder ret = new StringBuilder(); - for(String cmd : cmds) { - if(cmd.startsWith("r")) { - int count = 1; - if(cmd.length() > 1) { - count = Integer.parseInt(cmd.substring(1)); + + /** + * Execute a sequence of calls to Partition Manager. + * + * @param callSequence: is represented as a string of commands, e.g. "r,r,r,r,a1,f2,...". The commands are: r[N]: receive() called N + * times aX: ack(X) fY: fail(Y) + * @return the sequence of messages the receive call returns + */ + public String execute(String callSequence) { + + String[] cmds = callSequence.split(","); + StringBuilder ret = new StringBuilder(); + for (String cmd : cmds) { + if (cmd.startsWith("r")) { + int count = 1; + if (cmd.length() > 1) { + count = Integer.parseInt(cmd.substring(1)); + } + for (int i = 0; i < count; ++i) { + EventDataWrap ed = pm.receive(); + if (ed == null) { + ret.append("null,"); + } else { + ret.append(ed.getMessageId().getOffset()); + ret.append(","); + } + } + } else if (cmd.startsWith("a")) { + pm.ack(cmd.substring(1)); + } else if (cmd.startsWith("f")) { + pm.fail(cmd.substring(1)); + } } - for(int i=0; i<count; ++i) { - EventDataWrap ed = pm.receive(); - if(ed == null) { - ret.append("null,"); - } - else { - ret.append(ed.getMessageId().getOffset()); - ret.append(","); - } + if (ret.length() > 0) { + ret.setLength(ret.length() - 1); } - } - else if(cmd.startsWith("a")) { - pm.ack(cmd.substring(1)); - } - else if(cmd.startsWith("f")) { - pm.fail(cmd.substring(1)); - } + return ret.toString(); } - if(ret.length() > 0) { - ret.setLength(ret.length()-1); + + /** + * Exercise the IPartitionManager.checkpoint() method + * + * @return the offset that we write to state store + */ + public String checkpoint() { + pm.checkpoint(); + return stateStore.readData(statePath); } - return ret.toString(); - } - - /** - * Exercise the IPartitionManager.checkpoint() method - * @return the offset that we write to state store - */ - public String checkpoint() { - pm.checkpoint(); - return stateStore.readData(statePath); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java index 621d6a8..37dfdd5 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java @@ -15,57 +15,57 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import java.util.List; - import org.apache.storm.spout.ISpoutOutputCollector; /** * Mock of ISpoutOutputCollector */ public class SpoutOutputCollectorMock implements ISpoutOutputCollector { - //comma separated offsets - StringBuilder emittedOffset; - - public SpoutOutputCollectorMock() { - emittedOffset = new StringBuilder(); - } - - public String getOffsetSequenceAndReset() { - String ret = null; - if(emittedOffset.length() > 0) { - emittedOffset.setLength(emittedOffset.length()-1); - ret = emittedOffset.toString(); - emittedOffset.setLength(0); + //comma separated offsets + StringBuilder emittedOffset; + + public SpoutOutputCollectorMock() { + emittedOffset = new StringBuilder(); + } + + public String getOffsetSequenceAndReset() { + String ret = null; + if (emittedOffset.length() > 0) { + emittedOffset.setLength(emittedOffset.length() - 1); + ret = emittedOffset.toString(); + emittedOffset.setLength(0); + } + return ret; } - return ret; - } - @Override - public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { - MessageId mid = (MessageId)messageId; - String pid = mid.getPartitionId(); - String offset = mid.getOffset(); - emittedOffset.append(pid+"_"+offset+","); - return null; - } + @Override + public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { + MessageId mid = (MessageId) messageId; + String pid = mid.getPartitionId(); + String offset = mid.getOffset(); + emittedOffset.append(pid + "_" + offset + ","); + return null; + } - @Override - public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) { - } + @Override + public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) { + } - @Override - public void flush() { - // NO-OP - } + @Override + public void flush() { + // NO-OP + } - @Override - public void reportError(Throwable arg0) { - } + @Override + public void reportError(Throwable arg0) { + } - @Override - public long getPendingCount() { - return 0; - } + @Override + public long getPendingCount() { + return 0; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java index 0abe3a4..7bfcd93 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java @@ -15,40 +15,40 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import java.util.HashMap; import java.util.Map; -import org.apache.storm.eventhubs.spout.IStateStore; - /** * A state store mocker */ public class StateStoreMock implements IStateStore { - Map<String, String> myDataMap; - @Override - public void open() { - myDataMap = new HashMap<String, String>(); - } + Map<String, String> myDataMap; - @Override - public void close() { - myDataMap = null; - } + @Override + public void open() { + myDataMap = new HashMap<String, String>(); + } + + @Override + public void close() { + myDataMap = null; + } - @Override - public void saveData(String path, String data) { - if(myDataMap != null) { - myDataMap.put(path, data); + @Override + public void saveData(String path, String data) { + if (myDataMap != null) { + myDataMap.put(path, data); + } } - } - @Override - public String readData(String path) { - if(myDataMap != null) { - return myDataMap.get(path); + @Override + public String readData(String path) { + if (myDataMap != null) { + return myDataMap.get(path); + } + return null; } - return null; - } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java index 926337b..ab54f66 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import org.junit.After; @@ -25,23 +26,23 @@ import static org.junit.Assert.assertTrue; public class TestEventData { - @Before - public void setUp() throws Exception { - } + @Before + public void setUp() throws Exception { + } - @After - public void tearDown() throws Exception { - } + @After + public void tearDown() throws Exception { + } - @Test - public void testEventDataComparision() { + @Test + public void testEventDataComparision() { - MessageId messageId1 = MessageId.create(null, "3", 1); - EventDataWrap eventData1 = EventDataWrap.create(null, messageId1); + MessageId messageId1 = MessageId.create(null, "3", 1); + EventDataWrap eventData1 = EventDataWrap.create(null, messageId1); - MessageId messageId2 = MessageId.create(null, "13", 2); - EventDataWrap eventData2 = EventDataWrap.create(null, messageId2); + MessageId messageId2 = MessageId.create(null, "13", 2); + EventDataWrap eventData2 = EventDataWrap.create(null, messageId2); - assertTrue(eventData2.compareTo(eventData1) > 0); - } + assertTrue(eventData2.compareTo(eventData1) > 0); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java index a7b3588..06f3574 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import org.junit.After; @@ -24,50 +25,51 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; - public class TestEventHubSpout { - @Before - public void setUp() throws Exception { - } + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testSpoutConfig() { + EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "pas\\s+w/ord", + "namespace", "entityname", 16); + conf.setZkConnectionString("zookeeper"); + conf.setCheckpointIntervalInSeconds(1); + assertEquals(conf.getConnectionString(), + "Endpoint=amqps://namespace.servicebus.windows.net;EntityPath=entityname;SharedAccessKeyName=username;" + + "SharedAccessKey=pas\\s+w/ord;OperationTimeout=PT1M;RetryPolicy=Default"); + } + + @Test + public void testSpoutBasic() { + //This spout owns 2 partitions: 6 and 14 + EventHubSpoutCallerMock mock = new EventHubSpoutCallerMock(16, 8, 6, 10); + String result = mock.execute("r6,f6_0,a6_1,a6_2,a14_0,a14_2,r4,f14_1,r2"); + assertEquals("6_0,14_0,6_1,14_1,6_2,14_2,6_0,14_3,6_3,14_4,6_4,14_1", result); + } - @After - public void tearDown() throws Exception { - } - - @Test - public void testSpoutConfig() { - EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "pas\\s+w/ord", - "namespace", "entityname", 16); - conf.setZkConnectionString("zookeeper"); - conf.setCheckpointIntervalInSeconds(1); - assertEquals(conf.getConnectionString(), "Endpoint=amqps://namespace.servicebus.windows.net;EntityPath=entityname;SharedAccessKeyName=username;SharedAccessKey=pas\\s+w/ord;OperationTimeout=PT1M;RetryPolicy=Default"); - } + @Test + public void testSpoutCheckpoint() { + //Make sure that even though nextTuple() doesn't receive valid data, + //the offset will be checkpointed after checkpointInterval seconds. - @Test - public void testSpoutBasic() { - //This spout owns 2 partitions: 6 and 14 - EventHubSpoutCallerMock mock = new EventHubSpoutCallerMock(16, 8, 6,10); - String result = mock.execute("r6,f6_0,a6_1,a6_2,a14_0,a14_2,r4,f14_1,r2"); - assertEquals("6_0,14_0,6_1,14_1,6_2,14_2,6_0,14_3,6_3,14_4,6_4,14_1", result); - } - - @Test - public void testSpoutCheckpoint() { - //Make sure that even though nextTuple() doesn't receive valid data, - //the offset will be checkpointed after checkpointInterval seconds. - - //This spout owns 1 partitions: 6 - EventHubSpoutCallerMock mock = new EventHubSpoutCallerMock(8, 8, 6, 1); - String result = mock.execute("r6,a6_0,a6_1,a6_2"); - try { - Thread.sleep(2000); + //This spout owns 1 partitions: 6 + EventHubSpoutCallerMock mock = new EventHubSpoutCallerMock(8, 8, 6, 1); + String result = mock.execute("r6,a6_0,a6_1,a6_2"); + try { + Thread.sleep(2000); + } catch (InterruptedException ex) { + } + EventHubReceiverMock.setPause(true); + result = mock.execute("r3"); + EventHubReceiverMock.setPause(false); + assertEquals("3", mock.getCheckpoint(6)); } - catch(InterruptedException ex) {} - EventHubReceiverMock.setPause(true); - result = mock.execute("r3"); - EventHubReceiverMock.setPause(false); - assertEquals("3", mock.getCheckpoint(6)); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java index 5b28f1d..e4b16ef 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java @@ -15,103 +15,105 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; + +import static org.junit.Assert.assertEquals; public class TestPartitionManager { - @Before - public void setUp() throws Exception { - } + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testPartitionManagerNoFail() { + PartitionManagerCallerMock mock + = new PartitionManagerCallerMock("1"); + String result = mock.execute("r,r,r,a0,a1,a2,r"); + assertEquals("0,1,2,3", result); + } + + @Test + public void testPartitionManagerResend() { + PartitionManagerCallerMock mock + = new PartitionManagerCallerMock("1"); + String result = mock.execute("r,a0,r,r,r,f3,r,f2,f1,r,r,a1,a2,a3,r"); + assertEquals("0,1,2,3,3,1,2,4", result); + } + + @Test + public void testPMCheckpointWithPending() { + PartitionManagerCallerMock mock + = new PartitionManagerCallerMock("1"); + mock.execute("r,r,r"); + //no ack, so return the first of pending list + assertEquals("0", mock.checkpoint()); + mock.execute("a0,a2"); + //still need to return the first of pending list + assertEquals("1", mock.checkpoint()); + } + + @Test + public void testPMCheckpointWithResend() { + PartitionManagerCallerMock mock + = new PartitionManagerCallerMock("1"); + mock.execute("r,r,r,f2,f1,f0"); + //pending is empty, return the smallest in toResend + assertEquals("0", mock.checkpoint()); + mock.execute("r,a0"); + //pending is still empty + assertEquals("1", mock.checkpoint()); + } + + @Test + public void testPMCheckpointWithPendingAndResend() { + PartitionManagerCallerMock mock + = new PartitionManagerCallerMock("1"); + mock.execute("r,r,r,f2,f1"); + //return the smaller of pending and toResend + assertEquals("0", mock.checkpoint()); + mock.execute("a0,r"); + //now pending: [3], toResend: [1,2] + assertEquals("1", mock.checkpoint()); + } - @After - public void tearDown() throws Exception { - } + @Test + public void testPMCheckpointWithNoPendingAndNoResend() { + PartitionManagerCallerMock mock + = new PartitionManagerCallerMock("1"); + //if no event sent, no checkpoint shall be created + assertEquals(null, mock.checkpoint()); + mock.execute("r,r,r,f2,f1,r,r,a2,a1,a0"); + //all events are sent successfully, return last sent offset + assertEquals("2", mock.checkpoint()); + } - @Test - public void testPartitionManagerNoFail() { - PartitionManagerCallerMock mock - = new PartitionManagerCallerMock("1"); - String result = mock.execute("r,r,r,a0,a1,a2,r"); - assertEquals("0,1,2,3", result); - } + @Test + public void testPartitionManagerMaxPendingMessages() { + PartitionManagerCallerMock mock + = new PartitionManagerCallerMock("1"); + String result = mock.execute("r1024"); + //any receive call after exceeding max pending messages results in null + result = mock.execute("r2"); + assertEquals("null,null", result); + result = mock.execute("a0,a1,r2"); + assertEquals("1024,1025", result); + } - @Test - public void testPartitionManagerResend() { - PartitionManagerCallerMock mock - = new PartitionManagerCallerMock("1"); - String result = mock.execute("r,a0,r,r,r,f3,r,f2,f1,r,r,a1,a2,a3,r"); - assertEquals("0,1,2,3,3,1,2,4", result); - } - - @Test - public void testPMCheckpointWithPending() { - PartitionManagerCallerMock mock - = new PartitionManagerCallerMock("1"); - mock.execute("r,r,r"); - //no ack, so return the first of pending list - assertEquals("0", mock.checkpoint()); - mock.execute("a0,a2"); - //still need to return the first of pending list - assertEquals("1", mock.checkpoint()); - } - - @Test - public void testPMCheckpointWithResend() { - PartitionManagerCallerMock mock - = new PartitionManagerCallerMock("1"); - mock.execute("r,r,r,f2,f1,f0"); - //pending is empty, return the smallest in toResend - assertEquals("0", mock.checkpoint()); - mock.execute("r,a0"); - //pending is still empty - assertEquals("1", mock.checkpoint()); - } - - @Test - public void testPMCheckpointWithPendingAndResend() { - PartitionManagerCallerMock mock - = new PartitionManagerCallerMock("1"); - mock.execute("r,r,r,f2,f1"); - //return the smaller of pending and toResend - assertEquals("0", mock.checkpoint()); - mock.execute("a0,r"); - //now pending: [3], toResend: [1,2] - assertEquals("1", mock.checkpoint()); - } - - @Test - public void testPMCheckpointWithNoPendingAndNoResend() { - PartitionManagerCallerMock mock - = new PartitionManagerCallerMock("1"); - //if no event sent, no checkpoint shall be created - assertEquals(null, mock.checkpoint()); - mock.execute("r,r,r,f2,f1,r,r,a2,a1,a0"); - //all events are sent successfully, return last sent offset - assertEquals("2", mock.checkpoint()); - } - - @Test - public void testPartitionManagerMaxPendingMessages() { - PartitionManagerCallerMock mock - = new PartitionManagerCallerMock("1"); - String result = mock.execute("r1024"); - //any receive call after exceeding max pending messages results in null - result = mock.execute("r2"); - assertEquals("null,null", result); - result = mock.execute("a0,a1,r2"); - assertEquals("1024,1025", result); - } - - @Test - public void testPartitionManagerEnqueueTimeFilter() { - PartitionManagerCallerMock mock - = new PartitionManagerCallerMock("1", 123456); - String result = mock.execute("r2"); - assertEquals("123457,123458", result); - } + @Test + public void testPartitionManagerEnqueueTimeFilter() { + PartitionManagerCallerMock mock + = new PartitionManagerCallerMock("1", 123456); + String result = mock.execute("r2"); + assertEquals("123457,123458", result); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java index b6ce76a..31cbc32 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java @@ -15,79 +15,78 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ -package org.apache.storm.eventhubs.trident; -import static org.junit.Assert.*; +package org.apache.storm.eventhubs.trident; import java.util.Map; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - import org.apache.storm.eventhubs.spout.EventHubReceiverMock; import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; import org.apache.storm.eventhubs.spout.IEventHubReceiver; import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; public class TestTransactionalTridentEmitter { - private TransactionalTridentEventHubEmitter emitter; - private Partition partition; - private TridentCollectorMock collectorMock; - private final int batchSize = 32; + private final int batchSize = 32; + private TransactionalTridentEventHubEmitter emitter; + private Partition partition; + private TridentCollectorMock collectorMock; + + @Before + public void setUp() throws Exception { + EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "password", + "namespace", "entityname", 16, "zookeeper"); + conf.setTopologyName("TestTopo"); + IEventHubReceiverFactory recvFactory = new IEventHubReceiverFactory() { + @Override + public IEventHubReceiver create(EventHubSpoutConfig config, + String partitionId) { + return new EventHubReceiverMock(partitionId); + } + }; + partition = new Partition(conf, "0"); + emitter = new TransactionalTridentEventHubEmitter(conf, batchSize, null, recvFactory); + collectorMock = new TridentCollectorMock(); + } + + @After + public void tearDown() throws Exception { + emitter.close(); + emitter = null; + } + + @Test + public void testEmitInSequence() { + //test the happy path, emit batches in sequence + Map<String, Object> meta = emitter.emitPartitionBatchNew(null, collectorMock, partition, null); + String collected = collectorMock.getBuffer(); + assertTrue(collected.startsWith("message" + 0)); + //System.out.println("collected: " + collected); + collectorMock.clear(); - @Before - public void setUp() throws Exception { - EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "password", - "namespace", "entityname", 16, "zookeeper"); - conf.setTopologyName("TestTopo"); - IEventHubReceiverFactory recvFactory = new IEventHubReceiverFactory() { - @Override - public IEventHubReceiver create(EventHubSpoutConfig config, - String partitionId) { - return new EventHubReceiverMock(partitionId); - } - }; - partition = new Partition(conf, "0"); - emitter = new TransactionalTridentEventHubEmitter(conf, batchSize, null, recvFactory); - collectorMock = new TridentCollectorMock(); - } + emitter.emitPartitionBatchNew(null, collectorMock, partition, meta); + collected = collectorMock.getBuffer(); + //System.out.println("collected: " + collected); + assertTrue(collected.startsWith("message" + batchSize)); + } - @After - public void tearDown() throws Exception { - emitter.close(); - emitter = null; - } + @Test + public void testReEmit() { + //test we can re-emit the second batch + Map<String, Object> meta = emitter.emitPartitionBatchNew(null, collectorMock, partition, null); + collectorMock.clear(); - @Test - public void testEmitInSequence() { - //test the happy path, emit batches in sequence - Map<String, Object> meta = emitter.emitPartitionBatchNew(null, collectorMock, partition, null); - String collected = collectorMock.getBuffer(); - assertTrue(collected.startsWith("message"+0)); - //System.out.println("collected: " + collected); - collectorMock.clear(); - - emitter.emitPartitionBatchNew(null, collectorMock, partition, meta); - collected = collectorMock.getBuffer(); - //System.out.println("collected: " + collected); - assertTrue(collected.startsWith("message"+batchSize)); - } + //emit second batch + Map<String, Object> meta1 = emitter.emitPartitionBatchNew(null, collectorMock, partition, meta); + String collected0 = collectorMock.getBuffer(); + collectorMock.clear(); - @Test - public void testReEmit() { - //test we can re-emit the second batch - Map<String, Object> meta = emitter.emitPartitionBatchNew(null, collectorMock, partition, null); - collectorMock.clear(); - - //emit second batch - Map<String, Object> meta1 = emitter.emitPartitionBatchNew(null, collectorMock, partition, meta); - String collected0 = collectorMock.getBuffer(); - collectorMock.clear(); - - //re-emit second batch - emitter.emitPartitionBatch(null, collectorMock, partition, meta1); - String collected1 = collectorMock.getBuffer(); - assertTrue(collected0.equals(collected1)); - } + //re-emit second batch + emitter.emitPartitionBatch(null, collectorMock, partition, meta1); + String collected1 = collectorMock.getBuffer(); + assertTrue(collected0.equals(collected1)); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java index bd5b07f..59f7002 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java @@ -15,43 +15,43 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.trident; import java.util.List; - import org.apache.storm.trident.operation.TridentCollector; /** * A mock of TridentCollector */ public class TridentCollectorMock implements TridentCollector { - StringBuilder buffer; - - public TridentCollectorMock() { - buffer = new StringBuilder(); - } - - @Override - public void emit(List<Object> tuples) { - for(Object o: tuples) { - buffer.append(o.toString()); + StringBuilder buffer; + + public TridentCollectorMock() { + buffer = new StringBuilder(); + } + + @Override + public void emit(List<Object> tuples) { + for (Object o : tuples) { + buffer.append(o.toString()); + } + } + + @Override + public void flush() { + // NO-OP + } + + @Override + public void reportError(Throwable arg0) { + } + + public void clear() { + buffer.setLength(0); + } + + public String getBuffer() { + return buffer.toString(); } - } - - @Override - public void flush() { - // NO-OP - } - - @Override - public void reportError(Throwable arg0) { - } - - public void clear() { - buffer.setLength(0); - } - - public String getBuffer() { - return buffer.toString(); - } }
