http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
index 4709b5e..8d00397 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventCount.java
@@ -15,11 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.samples;
 
 import java.io.FileReader;
 import java.util.Properties;
-
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.eventhubs.samples.bolt.GlobalCountBolt;
@@ -30,119 +30,115 @@ import org.apache.storm.generated.StormTopology;
 import org.apache.storm.topology.TopologyBuilder;
 
 /**
- * The basic scenario topology that uses EventHubSpout with PartialCountBolt
- * and GlobalCountBolt.
- * To submit this topology:
- * storm jar {jarfile} {classname} {topologyname} {spoutconffile}
+ * The basic scenario topology that uses EventHubSpout with PartialCountBolt 
and GlobalCountBolt. To submit this topology: storm jar
+ * {jarfile} {classname} {topologyname} {spoutconffile}
  */
 public class EventCount {
-  protected EventHubSpoutConfig spoutConfig;
-  protected int numWorkers;
-  
-  public EventCount() {
-  }
-  
-  protected void readEHConfig(String[] args) throws Exception {
-         Properties properties = new Properties();
-    if(args.length > 1) {
-      properties.load(new FileReader(args[1]));
+    protected EventHubSpoutConfig spoutConfig;
+    protected int numWorkers;
+
+    public EventCount() {
     }
-    else {
-      properties.load(EventCount.class.getClassLoader().getResourceAsStream(
-          "Config.properties"));
+
+    public static void main(String[] args) throws Exception {
+        EventCount scenario = new EventCount();
+        scenario.runScenario(args);
     }
 
-    String username = properties.getProperty("eventhubspout.username");
-    String password = properties.getProperty("eventhubspout.password");
-    String namespaceName = properties.getProperty("eventhubspout.namespace");
-    String entityPath = properties.getProperty("eventhubspout.entitypath");
-    String targetFqnAddress = 
properties.getProperty("eventhubspout.targetfqnaddress");
-    String zkEndpointAddress = 
properties.getProperty("zookeeper.connectionstring");
-    int partitionCount = 
Integer.parseInt(properties.getProperty("eventhubspout.partitions.count"));
-    int checkpointIntervalInSeconds = 
Integer.parseInt(properties.getProperty("eventhubspout.checkpoint.interval"));
-    int receiverCredits = 
Integer.parseInt(properties.getProperty("eventhub.receiver.credits"));
-    String maxPendingMsgsPerPartitionStr = 
properties.getProperty("eventhubspout.max.pending.messages.per.partition");
-    if(maxPendingMsgsPerPartitionStr == null) {
-      maxPendingMsgsPerPartitionStr = "1024";
+    protected void readEHConfig(String[] args) throws Exception {
+        Properties properties = new Properties();
+        if (args.length > 1) {
+            properties.load(new FileReader(args[1]));
+        } else {
+            
properties.load(EventCount.class.getClassLoader().getResourceAsStream(
+                "Config.properties"));
+        }
+
+        String username = properties.getProperty("eventhubspout.username");
+        String password = properties.getProperty("eventhubspout.password");
+        String namespaceName = 
properties.getProperty("eventhubspout.namespace");
+        String entityPath = properties.getProperty("eventhubspout.entitypath");
+        String targetFqnAddress = 
properties.getProperty("eventhubspout.targetfqnaddress");
+        String zkEndpointAddress = 
properties.getProperty("zookeeper.connectionstring");
+        int partitionCount = 
Integer.parseInt(properties.getProperty("eventhubspout.partitions.count"));
+        int checkpointIntervalInSeconds = 
Integer.parseInt(properties.getProperty("eventhubspout.checkpoint.interval"));
+        int receiverCredits = 
Integer.parseInt(properties.getProperty("eventhub.receiver.credits"));
+        String maxPendingMsgsPerPartitionStr = 
properties.getProperty("eventhubspout.max.pending.messages.per.partition");
+        if (maxPendingMsgsPerPartitionStr == null) {
+            maxPendingMsgsPerPartitionStr = "1024";
+        }
+        int maxPendingMsgsPerPartition = 
Integer.parseInt(maxPendingMsgsPerPartitionStr);
+        String enqueueTimeDiffStr = 
properties.getProperty("eventhub.receiver.filter.timediff");
+        if (enqueueTimeDiffStr == null) {
+            enqueueTimeDiffStr = "0";
+        }
+        int enqueueTimeDiff = Integer.parseInt(enqueueTimeDiffStr);
+        long enqueueTimeFilter = 0;
+        if (enqueueTimeDiff != 0) {
+            enqueueTimeFilter = System.currentTimeMillis() - enqueueTimeDiff * 
1000;
+        }
+        String consumerGroupName = 
properties.getProperty("eventhubspout.consumer.group.name");
+
+        System.out.println("Eventhub spout config: ");
+        System.out.println("  partition count: " + partitionCount);
+        System.out.println("  checkpoint interval: " + 
checkpointIntervalInSeconds);
+        System.out.println("  receiver credits: " + receiverCredits);
+        spoutConfig = new EventHubSpoutConfig(username, password,
+                                              namespaceName, entityPath, 
partitionCount, zkEndpointAddress,
+                                              checkpointIntervalInSeconds, 
receiverCredits, maxPendingMsgsPerPartition,
+                                              enqueueTimeFilter);
+
+        if (targetFqnAddress != null) {
+            spoutConfig.setTargetAddress(targetFqnAddress);
+        }
+        spoutConfig.setConsumerGroupName(consumerGroupName);
+
+        //set the number of workers to be the same as partition number.
+        //the idea is to have a spout and a partial count bolt co-exist in one
+        //worker to avoid shuffling messages across workers in storm cluster.
+        numWorkers = spoutConfig.getPartitionCount();
+
+        if (args.length > 0) {
+            //set topology name so that sample Trident topology can use it as 
stream name.
+            spoutConfig.setTopologyName(args[0]);
+        }
     }
-    int maxPendingMsgsPerPartition = 
Integer.parseInt(maxPendingMsgsPerPartitionStr);
-    String enqueueTimeDiffStr = 
properties.getProperty("eventhub.receiver.filter.timediff");
-    if(enqueueTimeDiffStr == null) {
-      enqueueTimeDiffStr = "0";
+
+    protected EventHubSpout createEventHubSpout() {
+        EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
+        return eventHubSpout;
     }
-    int enqueueTimeDiff = Integer.parseInt(enqueueTimeDiffStr);
-    long enqueueTimeFilter = 0;
-    if(enqueueTimeDiff != 0) {
-      enqueueTimeFilter = System.currentTimeMillis() - enqueueTimeDiff*1000;
+
+    protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+        topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, 
spoutConfig.getPartitionCount())
+                       .setNumTasks(spoutConfig.getPartitionCount());
+        topologyBuilder.setBolt("PartialCountBolt", new PartialCountBolt(), 
spoutConfig.getPartitionCount())
+                       
.localOrShuffleGrouping("EventHubsSpout").setNumTasks(spoutConfig.getPartitionCount());
+        topologyBuilder.setBolt("GlobalCountBolt", new GlobalCountBolt(), 1)
+                       .globalGrouping("PartialCountBolt").setNumTasks(1);
+        return topologyBuilder.createTopology();
     }
-    String consumerGroupName = 
properties.getProperty("eventhubspout.consumer.group.name");
-    
-    System.out.println("Eventhub spout config: ");
-    System.out.println("  partition count: " + partitionCount);
-    System.out.println("  checkpoint interval: " + 
checkpointIntervalInSeconds);
-    System.out.println("  receiver credits: " + receiverCredits);
-    spoutConfig = new EventHubSpoutConfig(username, password,
-      namespaceName, entityPath, partitionCount, zkEndpointAddress,
-      checkpointIntervalInSeconds, receiverCredits, maxPendingMsgsPerPartition,
-      enqueueTimeFilter);
-
-    if(targetFqnAddress != null)
-    {
-      spoutConfig.setTargetAddress(targetFqnAddress);      
+
+    protected void submitTopology(String[] args, StormTopology topology) 
throws Exception {
+        Config config = new Config();
+        config.setDebug(false);
+        //Enable metrics
+        
config.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class,
 1);
+
+        String topoName = "test";
+        if (args.length > 0) {
+            topoName = args[0];
+        }
+        config.setNumWorkers(numWorkers);
+        StormSubmitter.submitTopology(topoName, config, topology);
     }
-    spoutConfig.setConsumerGroupName(consumerGroupName);
-
-    //set the number of workers to be the same as partition number.
-    //the idea is to have a spout and a partial count bolt co-exist in one
-    //worker to avoid shuffling messages across workers in storm cluster.
-    numWorkers = spoutConfig.getPartitionCount();
-    
-    if(args.length > 0) {
-      //set topology name so that sample Trident topology can use it as stream 
name.
-      spoutConfig.setTopologyName(args[0]);
+
+    protected void runScenario(String[] args) throws Exception {
+        readEHConfig(args);
+        EventHubSpout eventHubSpout = createEventHubSpout();
+        StormTopology topology = buildTopology(eventHubSpout);
+        submitTopology(args, topology);
     }
-       }
-  
-  protected EventHubSpout createEventHubSpout() {
-    EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
-    return eventHubSpout;
-  }
-       
-  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
-    TopologyBuilder topologyBuilder = new TopologyBuilder();
-
-    topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, 
spoutConfig.getPartitionCount())
-      .setNumTasks(spoutConfig.getPartitionCount());
-    topologyBuilder.setBolt("PartialCountBolt", new PartialCountBolt(), 
spoutConfig.getPartitionCount())
-      
.localOrShuffleGrouping("EventHubsSpout").setNumTasks(spoutConfig.getPartitionCount());
-    topologyBuilder.setBolt("GlobalCountBolt", new GlobalCountBolt(), 1)
-      .globalGrouping("PartialCountBolt").setNumTasks(1);
-    return topologyBuilder.createTopology();
-  }
-       
-  protected void submitTopology(String[] args, StormTopology topology) throws 
Exception {
-      Config config = new Config();
-      config.setDebug(false);
-      //Enable metrics
-      
config.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class,
 1);
-
-      String topoName = "test";
-      if (args.length > 0) {
-          topoName = args[0];
-      }
-      config.setNumWorkers(numWorkers);
-      StormSubmitter.submitTopology(topoName, config, topology);
-  }
-  
-  protected void runScenario(String[] args) throws Exception{
-    readEHConfig(args);
-    EventHubSpout eventHubSpout = createEventHubSpout();
-    StormTopology topology = buildTopology(eventHubSpout);
-    submitTopology(args, topology);
-  }
-
-  public static void main(String[] args) throws Exception {
-    EventCount scenario = new EventCount();
-    scenario.runScenario(args);
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
index 32dc0d5..55d4cf2 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/EventHubLoop.java
@@ -15,38 +15,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
-package org.apache.storm.eventhubs.samples;
 
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.topology.TopologyBuilder;
+package org.apache.storm.eventhubs.samples;
 
 import org.apache.storm.eventhubs.bolt.EventHubBolt;
 import org.apache.storm.eventhubs.bolt.EventHubBoltConfig;
 import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
 
 /**
  * A sample topology that loops message back to EventHub
  */
 public class EventHubLoop extends EventCount {
 
-  @Override
-  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
-    TopologyBuilder topologyBuilder = new TopologyBuilder();
+    public static void main(String[] args) throws Exception {
+        EventHubLoop scenario = new EventHubLoop();
+        scenario.runScenario(args);
+    }
+
+    @Override
+    protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+
+        topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, 
spoutConfig.getPartitionCount())
+                       .setNumTasks(spoutConfig.getPartitionCount());
+        EventHubBoltConfig boltConfig = new 
EventHubBoltConfig(spoutConfig.getConnectionString(),
+                                                               
spoutConfig.getEntityPath(), true);
 
-    topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, 
spoutConfig.getPartitionCount())
-      .setNumTasks(spoutConfig.getPartitionCount());
-    EventHubBoltConfig boltConfig = new 
EventHubBoltConfig(spoutConfig.getConnectionString(),
-        spoutConfig.getEntityPath(), true);
-    
-    EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
-    int boltTasks = spoutConfig.getPartitionCount();
-    topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
-      .localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
-    return topologyBuilder.createTopology();
-  }
-  
-  public static void main(String[] args) throws Exception {
-    EventHubLoop scenario = new EventHubLoop();
-    scenario.runScenario(args);
-  }
+        EventHubBolt eventHubBolt = new EventHubBolt(boltConfig);
+        int boltTasks = spoutConfig.getPartitionCount();
+        topologyBuilder.setBolt("EventHubsBolt", eventHubBolt, boltTasks)
+                       
.localOrShuffleGrouping("EventHubsSpout").setNumTasks(boltTasks);
+        return topologyBuilder.createTopology();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
index a433b8b..abcad5a 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/OpaqueTridentEventCount.java
@@ -15,39 +15,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.samples;
 
+import 
org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter;
+import org.apache.storm.eventhubs.spout.EventHubSpout;
+import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout;
+import org.apache.storm.generated.StormTopology;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.operation.builtin.Count;
 import org.apache.storm.trident.operation.builtin.Sum;
 import org.apache.storm.trident.testing.MemoryMapState;
-import org.apache.storm.generated.StormTopology;
 import org.apache.storm.tuple.Fields;
 
-import 
org.apache.storm.eventhubs.samples.TransactionalTridentEventCount.LoggingFilter;
-import org.apache.storm.eventhubs.spout.EventHubSpout;
-import org.apache.storm.eventhubs.trident.OpaqueTridentEventHubSpout;
-
 /**
  * A simple Trident topology uses OpaqueTridentEventHubSpout
  */
 public class OpaqueTridentEventCount extends EventCount {
-  @Override
-  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
-    TridentTopology topology = new TridentTopology();
-    
-    OpaqueTridentEventHubSpout spout = new 
OpaqueTridentEventHubSpout(spoutConfig);
-    TridentState state = topology.newStream("stream-" + 
spoutConfig.getTopologyName(), spout)
-        .parallelismHint(spoutConfig.getPartitionCount())
-        .aggregate(new Count(), new Fields("partial-count"))
-        .persistentAggregate(new MemoryMapState.Factory(), new 
Fields("partial-count"), new Sum(), new Fields("count"));
-    state.newValuesStream().each(new Fields("count"), new LoggingFilter("got 
count: ", 10000));
-    return topology.build();
-  }
-  
-  public static void main(String[] args) throws Exception {
-    OpaqueTridentEventCount scenario = new OpaqueTridentEventCount();
-    scenario.runScenario(args);
-  }
+    public static void main(String[] args) throws Exception {
+        OpaqueTridentEventCount scenario = new OpaqueTridentEventCount();
+        scenario.runScenario(args);
+    }
+
+    @Override
+    protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+        TridentTopology topology = new TridentTopology();
+
+        OpaqueTridentEventHubSpout spout = new 
OpaqueTridentEventHubSpout(spoutConfig);
+        TridentState state = topology.newStream("stream-" + 
spoutConfig.getTopologyName(), spout)
+                                     
.parallelismHint(spoutConfig.getPartitionCount())
+                                     .aggregate(new Count(), new 
Fields("partial-count"))
+                                     .persistentAggregate(new 
MemoryMapState.Factory(), new Fields("partial-count"), new Sum(),
+                                                          new Fields("count"));
+        state.newValuesStream().each(new Fields("count"), new 
LoggingFilter("got count: ", 10000));
+        return topology.build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
index 718c229..646dc9c 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/TransactionalTridentEventCount.java
@@ -15,17 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
-package org.apache.storm.eventhubs.samples;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.tuple.Fields;
+package org.apache.storm.eventhubs.samples;
 
 import org.apache.storm.eventhubs.spout.EventHubSpout;
 import org.apache.storm.eventhubs.trident.TransactionalTridentEventHubSpout;
-
+import org.apache.storm.generated.StormTopology;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.operation.BaseFilter;
@@ -33,49 +28,54 @@ import org.apache.storm.trident.operation.builtin.Count;
 import org.apache.storm.trident.operation.builtin.Sum;
 import org.apache.storm.trident.testing.MemoryMapState;
 import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A simple Trident topology uses TransactionalTridentEventHubSpout
  */
 public class TransactionalTridentEventCount extends EventCount {
-  public static class LoggingFilter extends BaseFilter {
-    private static final long serialVersionUID = 1L;
-    private static final Logger logger = 
LoggerFactory.getLogger(LoggingFilter.class);
-    private final String prefix;
-    private final long logIntervalMs;
-    private long lastTime;
-    public LoggingFilter(String prefix, int logIntervalMs) {
-      this.prefix = prefix;
-      this.logIntervalMs = logIntervalMs;
-      lastTime = System.nanoTime();
+    public static void main(String[] args) throws Exception {
+        TransactionalTridentEventCount scenario = new 
TransactionalTridentEventCount();
+        scenario.runScenario(args);
     }
 
     @Override
-    public boolean isKeep(TridentTuple tuple) {
-      long now = System.nanoTime();
-      if(logIntervalMs < (now - lastTime) / 1000000) {
-        logger.info(prefix + tuple.toString());
-        lastTime = now;
-      }
-      return false;
+    protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
+        TridentTopology topology = new TridentTopology();
+
+        TransactionalTridentEventHubSpout spout = new 
TransactionalTridentEventHubSpout(spoutConfig);
+        TridentState state = topology.newStream("stream-" + 
spoutConfig.getTopologyName(), spout)
+                                     
.parallelismHint(spoutConfig.getPartitionCount())
+                                     .aggregate(new Count(), new 
Fields("partial-count"))
+                                     .persistentAggregate(new 
MemoryMapState.Factory(), new Fields("partial-count"), new Sum(),
+                                                          new Fields("count"));
+        state.newValuesStream().each(new Fields("count"), new 
LoggingFilter("got count: ", 10000));
+        return topology.build();
+    }
+
+    public static class LoggingFilter extends BaseFilter {
+        private static final long serialVersionUID = 1L;
+        private static final Logger logger = 
LoggerFactory.getLogger(LoggingFilter.class);
+        private final String prefix;
+        private final long logIntervalMs;
+        private long lastTime;
+
+        public LoggingFilter(String prefix, int logIntervalMs) {
+            this.prefix = prefix;
+            this.logIntervalMs = logIntervalMs;
+            lastTime = System.nanoTime();
+        }
+
+        @Override
+        public boolean isKeep(TridentTuple tuple) {
+            long now = System.nanoTime();
+            if (logIntervalMs < (now - lastTime) / 1000000) {
+                logger.info(prefix + tuple.toString());
+                lastTime = now;
+            }
+            return false;
+        }
     }
-  }
-  
-  @Override
-  protected StormTopology buildTopology(EventHubSpout eventHubSpout) {
-    TridentTopology topology = new TridentTopology();
-    
-    TransactionalTridentEventHubSpout spout = new 
TransactionalTridentEventHubSpout(spoutConfig);
-    TridentState state = topology.newStream("stream-" + 
spoutConfig.getTopologyName(), spout)
-        .parallelismHint(spoutConfig.getPartitionCount())
-        .aggregate(new Count(), new Fields("partial-count"))
-        .persistentAggregate(new MemoryMapState.Factory(), new 
Fields("partial-count"), new Sum(), new Fields("count"));
-    state.newValuesStream().each(new Fields("count"), new LoggingFilter("got 
count: ", 10000));
-    return topology.build();
-  }
-  
-  public static void main(String[] args) throws Exception {
-    TransactionalTridentEventCount scenario = new 
TransactionalTridentEventCount();
-    scenario.runScenario(args);
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
index 9d94f5f..cf3fb76 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/GlobalCountBolt.java
@@ -15,15 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.samples.bolt;
 
 import java.util.HashMap;
 import java.util.Map;
-
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.storm.Config;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.task.TopologyContext;
@@ -31,58 +27,61 @@ import org.apache.storm.topology.BasicOutputCollector;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseBasicBolt;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Globally count number of messages
  */
 public class GlobalCountBolt extends BaseBasicBolt {
-  private static final long serialVersionUID = 1L;
-  private static final Logger logger = LoggerFactory
-      .getLogger(GlobalCountBolt.class);
-  private long globalCount;
-  private long globalCountDiff;
-  private long lastMetricsTime;
-  private long throughput;
-  
-  @Override
-  public void prepare(Map<String, Object> config, TopologyContext context) {
-    globalCount = 0;
-    globalCountDiff = 0;
-    lastMetricsTime = System.nanoTime();
-    context.registerMetric("GlobalMessageCount", new IMetric() {
-      @Override
-      public Object getValueAndReset() {
-        long now = System.nanoTime();
-        long millis = (now - lastMetricsTime) / 1000000;
-        throughput = globalCountDiff / millis * 1000;
-        Map<String, Object> values = new HashMap<>();
-        values.put("global_count", globalCount);
-        values.put("throughput", throughput);
-        lastMetricsTime = now;
-        globalCountDiff = 0;
-        return values;
-      }
-  }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
-  }
+    private static final long serialVersionUID = 1L;
+    private static final Logger logger = LoggerFactory
+        .getLogger(GlobalCountBolt.class);
+    private long globalCount;
+    private long globalCountDiff;
+    private long lastMetricsTime;
+    private long throughput;
 
-  @Override
-  public void execute(Tuple tuple, BasicOutputCollector collector) {
-    if (TupleUtils.isTick(tuple)) {
-      return;
+    @Override
+    public void prepare(Map<String, Object> config, TopologyContext context) {
+        globalCount = 0;
+        globalCountDiff = 0;
+        lastMetricsTime = System.nanoTime();
+        context.registerMetric("GlobalMessageCount", new IMetric() {
+            @Override
+            public Object getValueAndReset() {
+                long now = System.nanoTime();
+                long millis = (now - lastMetricsTime) / 1000000;
+                throughput = globalCountDiff / millis * 1000;
+                Map<String, Object> values = new HashMap<>();
+                values.put("global_count", globalCount);
+                values.put("throughput", throughput);
+                lastMetricsTime = now;
+                globalCountDiff = 0;
+                return values;
+            }
+        }, (Integer) 
config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
     }
 
-    int partial = (Integer)tuple.getValueByField("partial_count");
-    globalCount += partial;
-    globalCountDiff += partial;
-    if((globalCountDiff == partial) && (globalCount != globalCountDiff)) {
-      //metrics has just been collected, let's also log it
-      logger.info("Current throughput (messages/second): " + throughput);
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+        if (TupleUtils.isTick(tuple)) {
+            return;
+        }
+
+        int partial = (Integer) tuple.getValueByField("partial_count");
+        globalCount += partial;
+        globalCountDiff += partial;
+        if ((globalCountDiff == partial) && (globalCount != globalCountDiff)) {
+            //metrics has just been collected, let's also log it
+            logger.info("Current throughput (messages/second): " + throughput);
+        }
     }
-  }
 
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    
-  }
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
index 3763c69..e51dc08 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/bolt/PartialCountBolt.java
@@ -15,14 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.samples.bolt;
 
 import java.util.Map;
-
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.BasicOutputCollector;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -30,39 +26,42 @@ import org.apache.storm.topology.base.BaseBasicBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.TupleUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Partially count number of messages from EventHubs
  */
 public class PartialCountBolt extends BaseBasicBolt {
-  private static final long serialVersionUID = 1L;
-  private static final Logger logger = LoggerFactory
-      .getLogger(PartialCountBolt.class);
-  private static final int PartialCountBatchSize = 1000; 
-  
-  private int partialCount;
-  
-  @Override
-  public void prepare(Map<String, Object> topoConf, TopologyContext context) {
-    partialCount = 0;
-  }
-  
-  @Override
-  public void execute(Tuple tuple, BasicOutputCollector collector) {
-    if (TupleUtils.isTick(tuple)) {
-      return;
+    private static final long serialVersionUID = 1L;
+    private static final Logger logger = LoggerFactory
+        .getLogger(PartialCountBolt.class);
+    private static final int PartialCountBatchSize = 1000;
+
+    private int partialCount;
+
+    @Override
+    public void prepare(Map<String, Object> topoConf, TopologyContext context) 
{
+        partialCount = 0;
     }
 
-    partialCount++;
-    if(partialCount == PartialCountBatchSize) {
-      collector.emit(new Values(PartialCountBatchSize));
-      partialCount = 0;
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+        if (TupleUtils.isTick(tuple)) {
+            return;
+        }
+
+        partialCount++;
+        if (partialCount == PartialCountBatchSize) {
+            collector.emit(new Values(PartialCountBatchSize));
+            partialCount = 0;
+        }
     }
-  }
 
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declare(new Fields("partial_count"));
-  }
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("partial_count"));
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
index 09fb60f..7f6e697 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
@@ -15,71 +15,70 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import com.microsoft.azure.eventhubs.EventData;
-
 import java.util.Map;
 
 /**
- * A mock receiver that emits fake data with offset starting from given offset
- * and increase by 1 each time.
+ * A mock receiver that emits fake data with offset starting from given offset 
and increase by 1 each time.
  */
 public class EventHubReceiverMock implements IEventHubReceiver {
-  private static boolean isPaused = false;
-  private final String partitionId;
-  private long currentOffset;
-  private boolean isOpen;
+    private static boolean isPaused = false;
+    private final String partitionId;
+    private long currentOffset;
+    private boolean isOpen;
+
+    public EventHubReceiverMock(String pid) {
+        partitionId = pid;
+        isPaused = false;
+    }
 
-  public EventHubReceiverMock(String pid) {
-    partitionId = pid;
-    isPaused = false;
-  }
-  
-  /**
-   * Use this method to pause/resume all the receivers.
-   * If paused all receiver will return null.
-   * @param val
-   */
-  public static void setPause(boolean val) {
-    isPaused = val;
-  }
+    /**
+     * Use this method to pause/resume all the receivers. If paused all 
receiver will return null.
+     *
+     * @param val
+     */
+    public static void setPause(boolean val) {
+        isPaused = val;
+    }
 
-  @Override
-  public void open(IEventFilter filter) throws EventHubException {
-    currentOffset = filter.getOffset() != null ?
+    @Override
+    public void open(IEventFilter filter) throws EventHubException {
+        currentOffset = filter.getOffset() != null ?
             Long.parseLong(filter.getOffset()) :
             filter.getTime().toEpochMilli();
-    isOpen = true;
-  }
-
-  @Override
-  public void close() {
-    isOpen = false;
-  }
+        isOpen = true;
+    }
 
-  @Override
-  public boolean isOpen() {
-    return isOpen;
-  }
+    @Override
+    public void close() {
+        isOpen = false;
+    }
 
-  @Override
-  public EventDataWrap receive() {
-    if(isPaused) {
-      return null;
+    @Override
+    public boolean isOpen() {
+        return isOpen;
     }
 
-    currentOffset++;
+    @Override
+    public EventDataWrap receive() {
+        if (isPaused) {
+            return null;
+        }
+
+        currentOffset++;
 
-    //the body of the message is "message" + currentOffset, e.g. "message123"
+        //the body of the message is "message" + currentOffset, e.g. 
"message123"
 
-    MessageId mid = new MessageId(partitionId, "" + currentOffset, 
currentOffset);
-    EventData ed = new EventData(("message" + currentOffset).getBytes());
-    return EventDataWrap.create(ed,mid);
-  }
-  
-  @Override
-  public Map<String, Object> getMetricsData() {
-    return null;
-  }
+        MessageId mid = new MessageId(partitionId, "" + currentOffset, 
currentOffset);
+        EventData ed = new EventData(("message" + currentOffset).getBytes());
+        return EventDataWrap.create(ed, mid);
+    }
+
+    @Override
+    public Map<String, Object> getMetricsData() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java
index 7eb625c..16f3b80 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubSpoutCallerMock.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -23,74 +24,69 @@ import org.apache.storm.spout.SpoutOutputCollector;
  * Mocks EventHubSpout's caller (storm framework)
  */
 public class EventHubSpoutCallerMock {
-  public static final String statePathPrefix = 
"/eventhubspout/TestTopo/namespace/entityname/partitions/";
-  EventHubSpout spout;
-  private IStateStore stateStore;
-  private SpoutOutputCollectorMock collector;
-  
-  public EventHubSpoutCallerMock(int totalPartitions,
-      int totalTasks, int taskIndex, int checkpointInterval) {
-    stateStore = new StateStoreMock();
-    EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "password",
-        "namespace", "entityname", totalPartitions, "zookeeper", 
checkpointInterval, 1024);
-    conf.setTopologyName("TestTopo");
-    
-    IEventHubReceiverFactory recvFactory = new IEventHubReceiverFactory() {
-      @Override
-      public IEventHubReceiver create(EventHubSpoutConfig config,
-          String partitionId) {
-        return new EventHubReceiverMock(partitionId);
-      }
-    };
-    //mock state store and receiver
-    spout = new EventHubSpout(conf, stateStore, null, recvFactory);
-    
-    collector = new SpoutOutputCollectorMock();
-    
-    try {
-      spout.preparePartitions(null, totalTasks, taskIndex, new 
SpoutOutputCollector(collector));
-    }
-    catch(Exception ex) {
-    }
-  }
-  
-  /**
-   * Execute a sequence of calls to EventHubSpout.
-   * 
-   * @param callSequence: is represented as a string of commands, 
-   * e.g. "r,r,r,r,a1,f2,...". The commands are:
-   * r[N]: receive() called N times
-   * aP_X: ack(P_X), partition: P, offset: X
-   * fP_Y: fail(P_Y), partition: P, offset: Y
-   */
-  public String execute(String callSequence) {
-    String[] cmds = callSequence.split(",");
-    for(String cmd : cmds) {
-      if(cmd.startsWith("r")) {
-        int count = 1;
-        if(cmd.length() > 1) {
-          count = Integer.parseInt(cmd.substring(1));
+    public static final String statePathPrefix = 
"/eventhubspout/TestTopo/namespace/entityname/partitions/";
+    EventHubSpout spout;
+    private IStateStore stateStore;
+    private SpoutOutputCollectorMock collector;
+
+    public EventHubSpoutCallerMock(int totalPartitions,
+                                   int totalTasks, int taskIndex, int 
checkpointInterval) {
+        stateStore = new StateStoreMock();
+        EventHubSpoutConfig conf = new EventHubSpoutConfig("username", 
"password",
+                                                           "namespace", 
"entityname", totalPartitions, "zookeeper", checkpointInterval,
+                                                           1024);
+        conf.setTopologyName("TestTopo");
+
+        IEventHubReceiverFactory recvFactory = new IEventHubReceiverFactory() {
+            @Override
+            public IEventHubReceiver create(EventHubSpoutConfig config,
+                                            String partitionId) {
+                return new EventHubReceiverMock(partitionId);
+            }
+        };
+        //mock state store and receiver
+        spout = new EventHubSpout(conf, stateStore, null, recvFactory);
+
+        collector = new SpoutOutputCollectorMock();
+
+        try {
+            spout.preparePartitions(null, totalTasks, taskIndex, new 
SpoutOutputCollector(collector));
+        } catch (Exception ex) {
         }
-        for(int i=0; i<count; ++i) {
-          spout.nextTuple();
+    }
+
+    /**
+     * Execute a sequence of calls to EventHubSpout.
+     *
+     * @param callSequence: is represented as a string of commands, e.g. 
"r,r,r,r,a1,f2,...". The commands are: r[N]: receive() called N
+     *                      times aP_X: ack(P_X), partition: P, offset: X 
fP_Y: fail(P_Y), partition: P, offset: Y
+     */
+    public String execute(String callSequence) {
+        String[] cmds = callSequence.split(",");
+        for (String cmd : cmds) {
+            if (cmd.startsWith("r")) {
+                int count = 1;
+                if (cmd.length() > 1) {
+                    count = Integer.parseInt(cmd.substring(1));
+                }
+                for (int i = 0; i < count; ++i) {
+                    spout.nextTuple();
+                }
+            } else if (cmd.startsWith("a")) {
+                String[] midStrs = cmd.substring(1).split("_");
+                MessageId msgId = new MessageId(midStrs[0], midStrs[1], 
Long.parseLong(midStrs[1]));
+                spout.ack(msgId);
+            } else if (cmd.startsWith("f")) {
+                String[] midStrs = cmd.substring(1).split("_");
+                MessageId msgId = new MessageId(midStrs[0], midStrs[1], 
Long.parseLong(midStrs[1]));
+                spout.fail(msgId);
+            }
         }
-      }
-      else if(cmd.startsWith("a")) {
-        String[] midStrs = cmd.substring(1).split("_");
-        MessageId msgId = new MessageId(midStrs[0], midStrs[1], 
Long.parseLong(midStrs[1]));
-        spout.ack(msgId);
-      }
-      else if(cmd.startsWith("f")) {
-        String[] midStrs = cmd.substring(1).split("_");
-        MessageId msgId = new MessageId(midStrs[0], midStrs[1], 
Long.parseLong(midStrs[1]));
-        spout.fail(msgId);
-      }
+        return collector.getOffsetSequenceAndReset();
+    }
+
+    public String getCheckpoint(int partitionIndex) {
+        String statePath = statePathPrefix + partitionIndex;
+        return stateStore.readData(statePath);
     }
-    return collector.getOffsetSequenceAndReset();
-  }
-  
-  public String getCheckpoint(int partitionIndex) {
-    String statePath = statePathPrefix + partitionIndex;
-    return stateStore.readData(statePath);
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
index 467461c..364990e 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java
@@ -15,87 +15,81 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 /**
  * This mock exercises PartitionManager
  */
 public class PartitionManagerCallerMock {
-  public static final String statePath = 
"/eventhubspout/TestTopo/namespace/entityname/partitions/1";
-  private IPartitionManager pm;
-  private IStateStore stateStore;
+    public static final String statePath = 
"/eventhubspout/TestTopo/namespace/entityname/partitions/1";
+    private IPartitionManager pm;
+    private IStateStore stateStore;
 
-  public PartitionManagerCallerMock(String partitionId) {
-    this(partitionId, 0);
-  }
-  
-  public PartitionManagerCallerMock(String partitionId, long 
enqueueTimeFilter) {
-    EventHubReceiverMock receiver = new EventHubReceiverMock(partitionId);
-    EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "password",
-      "namespace", "entityname", 16, "zookeeper", 10, 1024, 1024, 
enqueueTimeFilter);
-    conf.setTopologyName("TestTopo");
-    stateStore = new StateStoreMock();
-    this.pm = new PartitionManager(conf, partitionId, stateStore, receiver);
-    
-    stateStore.open();
-    try {
-      pm.open();
+    public PartitionManagerCallerMock(String partitionId) {
+        this(partitionId, 0);
     }
-    catch (Exception ex) {
+
+    public PartitionManagerCallerMock(String partitionId, long 
enqueueTimeFilter) {
+        EventHubReceiverMock receiver = new EventHubReceiverMock(partitionId);
+        EventHubSpoutConfig conf = new EventHubSpoutConfig("username", 
"password",
+                                                           "namespace", 
"entityname", 16, "zookeeper", 10, 1024, 1024, enqueueTimeFilter);
+        conf.setTopologyName("TestTopo");
+        stateStore = new StateStoreMock();
+        this.pm = new PartitionManager(conf, partitionId, stateStore, 
receiver);
+
+        stateStore.open();
+        try {
+            pm.open();
+        } catch (Exception ex) {
+        }
     }
-  }
-  
-  /**
-   * Execute a sequence of calls to Partition Manager.
-   * 
-   * @param callSequence: is represented as a string of commands, 
-   * e.g. "r,r,r,r,a1,f2,...". The commands are:
-   * r[N]: receive() called N times
-   * aX: ack(X)
-   * fY: fail(Y)
-   * 
-   * @return the sequence of messages the receive call returns
-   */
-  public String execute(String callSequence) {
-    
-    String[] cmds = callSequence.split(",");
-    StringBuilder ret = new StringBuilder();
-    for(String cmd : cmds) {
-      if(cmd.startsWith("r")) {
-        int count = 1;
-        if(cmd.length() > 1) {
-          count = Integer.parseInt(cmd.substring(1));
+
+    /**
+     * Execute a sequence of calls to Partition Manager.
+     *
+     * @param callSequence: is represented as a string of commands, e.g. 
"r,r,r,r,a1,f2,...". The commands are: r[N]: receive() called N
+     *                      times aX: ack(X) fY: fail(Y)
+     * @return the sequence of messages the receive call returns
+     */
+    public String execute(String callSequence) {
+
+        String[] cmds = callSequence.split(",");
+        StringBuilder ret = new StringBuilder();
+        for (String cmd : cmds) {
+            if (cmd.startsWith("r")) {
+                int count = 1;
+                if (cmd.length() > 1) {
+                    count = Integer.parseInt(cmd.substring(1));
+                }
+                for (int i = 0; i < count; ++i) {
+                    EventDataWrap ed = pm.receive();
+                    if (ed == null) {
+                        ret.append("null,");
+                    } else {
+                        ret.append(ed.getMessageId().getOffset());
+                        ret.append(",");
+                    }
+                }
+            } else if (cmd.startsWith("a")) {
+                pm.ack(cmd.substring(1));
+            } else if (cmd.startsWith("f")) {
+                pm.fail(cmd.substring(1));
+            }
         }
-        for(int i=0; i<count; ++i) {
-          EventDataWrap ed = pm.receive();
-          if(ed == null) {
-            ret.append("null,");
-          }
-          else {
-            ret.append(ed.getMessageId().getOffset());
-            ret.append(",");
-          }
+        if (ret.length() > 0) {
+            ret.setLength(ret.length() - 1);
         }
-      }
-      else if(cmd.startsWith("a")) {
-        pm.ack(cmd.substring(1));
-      }
-      else if(cmd.startsWith("f")) {
-        pm.fail(cmd.substring(1));
-      }
+        return ret.toString();
     }
-    if(ret.length() > 0) {
-      ret.setLength(ret.length()-1);
+
+    /**
+     * Exercise the IPartitionManager.checkpoint() method
+     *
+     * @return the offset that we write to state store
+     */
+    public String checkpoint() {
+        pm.checkpoint();
+        return stateStore.readData(statePath);
     }
-    return ret.toString();
-  }
-  
-  /**
-   * Exercise the IPartitionManager.checkpoint() method
-   * @return the offset that we write to state store
-   */
-  public String checkpoint() {
-    pm.checkpoint();
-    return stateStore.readData(statePath);
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
index 621d6a8..37dfdd5 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/SpoutOutputCollectorMock.java
@@ -15,57 +15,57 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import java.util.List;
-
 import org.apache.storm.spout.ISpoutOutputCollector;
 
 /**
  * Mock of ISpoutOutputCollector
  */
 public class SpoutOutputCollectorMock implements ISpoutOutputCollector {
-  //comma separated offsets
-  StringBuilder emittedOffset;
-  
-  public SpoutOutputCollectorMock() {
-    emittedOffset = new StringBuilder();
-  }
-  
-  public String getOffsetSequenceAndReset() {
-    String ret = null;
-    if(emittedOffset.length() > 0) {
-      emittedOffset.setLength(emittedOffset.length()-1);
-      ret = emittedOffset.toString();
-      emittedOffset.setLength(0);
+    //comma separated offsets
+    StringBuilder emittedOffset;
+
+    public SpoutOutputCollectorMock() {
+        emittedOffset = new StringBuilder();
+    }
+
+    public String getOffsetSequenceAndReset() {
+        String ret = null;
+        if (emittedOffset.length() > 0) {
+            emittedOffset.setLength(emittedOffset.length() - 1);
+            ret = emittedOffset.toString();
+            emittedOffset.setLength(0);
+        }
+        return ret;
     }
-    return ret;
-  }
 
-  @Override
-  public List<Integer> emit(String streamId, List<Object> tuple, Object 
messageId) {
-    MessageId mid = (MessageId)messageId;
-    String pid = mid.getPartitionId();
-    String offset = mid.getOffset();
-    emittedOffset.append(pid+"_"+offset+",");
-    return null;
-  }
+    @Override
+    public List<Integer> emit(String streamId, List<Object> tuple, Object 
messageId) {
+        MessageId mid = (MessageId) messageId;
+        String pid = mid.getPartitionId();
+        String offset = mid.getOffset();
+        emittedOffset.append(pid + "_" + offset + ",");
+        return null;
+    }
 
-  @Override
-  public void emitDirect(int arg0, String arg1, List<Object> arg2, Object 
arg3) {
-  }
+    @Override
+    public void emitDirect(int arg0, String arg1, List<Object> arg2, Object 
arg3) {
+    }
 
-  @Override
-  public void flush() {
-    // NO-OP
-  }
+    @Override
+    public void flush() {
+        // NO-OP
+    }
 
-  @Override
-  public void reportError(Throwable arg0) {
-  }
+    @Override
+    public void reportError(Throwable arg0) {
+    }
 
-  @Override
-  public long getPendingCount() {
-    return 0;
-  }
+    @Override
+    public long getPendingCount() {
+        return 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
index 0abe3a4..7bfcd93 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/StateStoreMock.java
@@ -15,40 +15,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.storm.eventhubs.spout.IStateStore;
-
 /**
  * A state store mocker
  */
 public class StateStoreMock implements IStateStore {
-  Map<String, String> myDataMap;
-  @Override
-  public void open() {
-    myDataMap = new HashMap<String, String>();
-  }
+    Map<String, String> myDataMap;
 
-  @Override
-  public void close() {
-    myDataMap = null;
-  }
+    @Override
+    public void open() {
+        myDataMap = new HashMap<String, String>();
+    }
+
+    @Override
+    public void close() {
+        myDataMap = null;
+    }
 
-  @Override
-  public void saveData(String path, String data) {
-    if(myDataMap != null) {
-      myDataMap.put(path, data);
+    @Override
+    public void saveData(String path, String data) {
+        if (myDataMap != null) {
+            myDataMap.put(path, data);
+        }
     }
-  }
 
-  @Override
-  public String readData(String path) {
-    if(myDataMap != null) {
-      return myDataMap.get(path);
+    @Override
+    public String readData(String path) {
+        if (myDataMap != null) {
+            return myDataMap.get(path);
+        }
+        return null;
     }
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
index 926337b..ab54f66 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import org.junit.After;
@@ -25,23 +26,23 @@ import static org.junit.Assert.assertTrue;
 
 public class TestEventData {
 
-  @Before
-  public void setUp() throws Exception {
-  }
+    @Before
+    public void setUp() throws Exception {
+    }
 
-  @After
-  public void tearDown() throws Exception {
-  }
+    @After
+    public void tearDown() throws Exception {
+    }
 
-  @Test
-  public void testEventDataComparision() {
+    @Test
+    public void testEventDataComparision() {
 
-       MessageId messageId1 = MessageId.create(null, "3", 1);
-       EventDataWrap eventData1 = EventDataWrap.create(null, messageId1);
+        MessageId messageId1 = MessageId.create(null, "3", 1);
+        EventDataWrap eventData1 = EventDataWrap.create(null, messageId1);
 
-       MessageId messageId2 = MessageId.create(null, "13", 2);
-       EventDataWrap eventData2 = EventDataWrap.create(null, messageId2);
+        MessageId messageId2 = MessageId.create(null, "13", 2);
+        EventDataWrap eventData2 = EventDataWrap.create(null, messageId2);
 
-       assertTrue(eventData2.compareTo(eventData1) > 0);
-  }
+        assertTrue(eventData2.compareTo(eventData1) > 0);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
index a7b3588..06f3574 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import org.junit.After;
@@ -24,50 +25,51 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 
 
-
 public class TestEventHubSpout {
 
-  @Before
-  public void setUp() throws Exception {
-  }
+    @Before
+    public void setUp() throws Exception {
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test
+    public void testSpoutConfig() {
+        EventHubSpoutConfig conf = new EventHubSpoutConfig("username", 
"pas\\s+w/ord",
+                                                           "namespace", 
"entityname", 16);
+        conf.setZkConnectionString("zookeeper");
+        conf.setCheckpointIntervalInSeconds(1);
+        assertEquals(conf.getConnectionString(),
+                     
"Endpoint=amqps://namespace.servicebus.windows.net;EntityPath=entityname;SharedAccessKeyName=username;"
 +
+                     
"SharedAccessKey=pas\\s+w/ord;OperationTimeout=PT1M;RetryPolicy=Default");
+    }
+
+    @Test
+    public void testSpoutBasic() {
+        //This spout owns 2 partitions: 6 and 14
+        EventHubSpoutCallerMock mock = new EventHubSpoutCallerMock(16, 8, 6, 
10);
+        String result = 
mock.execute("r6,f6_0,a6_1,a6_2,a14_0,a14_2,r4,f14_1,r2");
+        assertEquals("6_0,14_0,6_1,14_1,6_2,14_2,6_0,14_3,6_3,14_4,6_4,14_1", 
result);
+    }
 
-  @After
-  public void tearDown() throws Exception {
-  }
-  
-  @Test
-  public void testSpoutConfig() {
-    EventHubSpoutConfig conf = new EventHubSpoutConfig("username", 
"pas\\s+w/ord",
-        "namespace", "entityname", 16);
-    conf.setZkConnectionString("zookeeper");
-    conf.setCheckpointIntervalInSeconds(1);
-    assertEquals(conf.getConnectionString(), 
"Endpoint=amqps://namespace.servicebus.windows.net;EntityPath=entityname;SharedAccessKeyName=username;SharedAccessKey=pas\\s+w/ord;OperationTimeout=PT1M;RetryPolicy=Default");
-  }
+    @Test
+    public void testSpoutCheckpoint() {
+        //Make sure that even though nextTuple() doesn't receive valid data,
+        //the offset will be checkpointed after checkpointInterval seconds.
 
-  @Test
-  public void testSpoutBasic() {
-    //This spout owns 2 partitions: 6 and 14
-    EventHubSpoutCallerMock mock = new EventHubSpoutCallerMock(16, 8, 6,10);
-    String result = mock.execute("r6,f6_0,a6_1,a6_2,a14_0,a14_2,r4,f14_1,r2");
-    assertEquals("6_0,14_0,6_1,14_1,6_2,14_2,6_0,14_3,6_3,14_4,6_4,14_1", 
result);
-  }
-  
-  @Test
-  public void testSpoutCheckpoint() {
-    //Make sure that even though nextTuple() doesn't receive valid data,
-    //the offset will be checkpointed after checkpointInterval seconds.
-    
-    //This spout owns 1 partitions: 6
-    EventHubSpoutCallerMock mock = new EventHubSpoutCallerMock(8, 8, 6, 1);
-    String result = mock.execute("r6,a6_0,a6_1,a6_2");
-    try {
-      Thread.sleep(2000);
+        //This spout owns 1 partitions: 6
+        EventHubSpoutCallerMock mock = new EventHubSpoutCallerMock(8, 8, 6, 1);
+        String result = mock.execute("r6,a6_0,a6_1,a6_2");
+        try {
+            Thread.sleep(2000);
+        } catch (InterruptedException ex) {
+        }
+        EventHubReceiverMock.setPause(true);
+        result = mock.execute("r3");
+        EventHubReceiverMock.setPause(false);
+        assertEquals("3", mock.getCheckpoint(6));
     }
-    catch(InterruptedException ex) {}
-    EventHubReceiverMock.setPause(true);
-    result = mock.execute("r3");
-    EventHubReceiverMock.setPause(false);
-    assertEquals("3", mock.getCheckpoint(6));
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java
index 5b28f1d..e4b16ef 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestPartitionManager.java
@@ -15,103 +15,105 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import static org.junit.Assert.*;
+
+import static org.junit.Assert.assertEquals;
 
 public class TestPartitionManager {
 
-  @Before
-  public void setUp() throws Exception {
-  }
+    @Before
+    public void setUp() throws Exception {
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test
+    public void testPartitionManagerNoFail() {
+        PartitionManagerCallerMock mock
+            = new PartitionManagerCallerMock("1");
+        String result = mock.execute("r,r,r,a0,a1,a2,r");
+        assertEquals("0,1,2,3", result);
+    }
+
+    @Test
+    public void testPartitionManagerResend() {
+        PartitionManagerCallerMock mock
+            = new PartitionManagerCallerMock("1");
+        String result = mock.execute("r,a0,r,r,r,f3,r,f2,f1,r,r,a1,a2,a3,r");
+        assertEquals("0,1,2,3,3,1,2,4", result);
+    }
+
+    @Test
+    public void testPMCheckpointWithPending() {
+        PartitionManagerCallerMock mock
+            = new PartitionManagerCallerMock("1");
+        mock.execute("r,r,r");
+        //no ack, so return the first of pending list
+        assertEquals("0", mock.checkpoint());
+        mock.execute("a0,a2");
+        //still need to return the first of pending list
+        assertEquals("1", mock.checkpoint());
+    }
+
+    @Test
+    public void testPMCheckpointWithResend() {
+        PartitionManagerCallerMock mock
+            = new PartitionManagerCallerMock("1");
+        mock.execute("r,r,r,f2,f1,f0");
+        //pending is empty, return the smallest in toResend
+        assertEquals("0", mock.checkpoint());
+        mock.execute("r,a0");
+        //pending is still empty
+        assertEquals("1", mock.checkpoint());
+    }
+
+    @Test
+    public void testPMCheckpointWithPendingAndResend() {
+        PartitionManagerCallerMock mock
+            = new PartitionManagerCallerMock("1");
+        mock.execute("r,r,r,f2,f1");
+        //return the smaller of pending and toResend
+        assertEquals("0", mock.checkpoint());
+        mock.execute("a0,r");
+        //now pending: [3], toResend: [1,2]
+        assertEquals("1", mock.checkpoint());
+    }
 
-  @After
-  public void tearDown() throws Exception {
-  }
+    @Test
+    public void testPMCheckpointWithNoPendingAndNoResend() {
+        PartitionManagerCallerMock mock
+            = new PartitionManagerCallerMock("1");
+        //if no event sent, no checkpoint shall be created
+        assertEquals(null, mock.checkpoint());
+        mock.execute("r,r,r,f2,f1,r,r,a2,a1,a0");
+        //all events are sent successfully, return last sent offset
+        assertEquals("2", mock.checkpoint());
+    }
 
-  @Test
-  public void testPartitionManagerNoFail() {
-    PartitionManagerCallerMock mock
-      = new PartitionManagerCallerMock("1");
-    String result = mock.execute("r,r,r,a0,a1,a2,r");
-    assertEquals("0,1,2,3", result);
-  }
+    @Test
+    public void testPartitionManagerMaxPendingMessages() {
+        PartitionManagerCallerMock mock
+            = new PartitionManagerCallerMock("1");
+        String result = mock.execute("r1024");
+        //any receive call after exceeding max pending messages results in null
+        result = mock.execute("r2");
+        assertEquals("null,null", result);
+        result = mock.execute("a0,a1,r2");
+        assertEquals("1024,1025", result);
+    }
 
-  @Test
-  public void testPartitionManagerResend() {
-    PartitionManagerCallerMock mock
-      = new PartitionManagerCallerMock("1");
-    String result = mock.execute("r,a0,r,r,r,f3,r,f2,f1,r,r,a1,a2,a3,r");
-    assertEquals("0,1,2,3,3,1,2,4", result);
-  }
-  
-  @Test
-  public void testPMCheckpointWithPending() {
-    PartitionManagerCallerMock mock
-      = new PartitionManagerCallerMock("1");
-    mock.execute("r,r,r");
-    //no ack, so return the first of pending list
-    assertEquals("0", mock.checkpoint());
-    mock.execute("a0,a2");
-    //still need to return the first of pending list
-    assertEquals("1", mock.checkpoint());
-  }
-  
-  @Test
-  public void testPMCheckpointWithResend() {
-    PartitionManagerCallerMock mock
-      = new PartitionManagerCallerMock("1");
-    mock.execute("r,r,r,f2,f1,f0");
-    //pending is empty, return the smallest in toResend
-    assertEquals("0", mock.checkpoint());
-    mock.execute("r,a0");
-    //pending is still empty
-    assertEquals("1", mock.checkpoint());
-  }
-  
-  @Test
-  public void testPMCheckpointWithPendingAndResend() {
-    PartitionManagerCallerMock mock
-      = new PartitionManagerCallerMock("1");
-    mock.execute("r,r,r,f2,f1");
-    //return the smaller of pending and toResend
-    assertEquals("0", mock.checkpoint());
-    mock.execute("a0,r");
-    //now pending: [3], toResend: [1,2]
-    assertEquals("1", mock.checkpoint());
-  }
-  
-  @Test
-  public void testPMCheckpointWithNoPendingAndNoResend() {
-    PartitionManagerCallerMock mock
-      = new PartitionManagerCallerMock("1");
-    //if no event sent, no checkpoint shall be created
-    assertEquals(null, mock.checkpoint());
-    mock.execute("r,r,r,f2,f1,r,r,a2,a1,a0");
-    //all events are sent successfully, return last sent offset
-    assertEquals("2", mock.checkpoint());
-  }
-  
-  @Test
-  public void testPartitionManagerMaxPendingMessages() {
-    PartitionManagerCallerMock mock
-      = new PartitionManagerCallerMock("1");
-    String result = mock.execute("r1024");
-    //any receive call after exceeding max pending messages results in null
-    result = mock.execute("r2");
-    assertEquals("null,null", result);
-    result = mock.execute("a0,a1,r2");
-    assertEquals("1024,1025", result);
-  }
-  
-  @Test
-  public void testPartitionManagerEnqueueTimeFilter() {
-    PartitionManagerCallerMock mock
-      = new PartitionManagerCallerMock("1", 123456);
-    String result = mock.execute("r2");
-    assertEquals("123457,123458", result);
-  }
+    @Test
+    public void testPartitionManagerEnqueueTimeFilter() {
+        PartitionManagerCallerMock mock
+            = new PartitionManagerCallerMock("1", 123456);
+        String result = mock.execute("r2");
+        assertEquals("123457,123458", result);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java
index b6ce76a..31cbc32 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TestTransactionalTridentEmitter.java
@@ -15,79 +15,78 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
-package org.apache.storm.eventhubs.trident;
 
-import static org.junit.Assert.*;
+package org.apache.storm.eventhubs.trident;
 
 import java.util.Map;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.storm.eventhubs.spout.EventHubReceiverMock;
 import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
 import org.apache.storm.eventhubs.spout.IEventHubReceiver;
 import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
 
 public class TestTransactionalTridentEmitter {
-  private TransactionalTridentEventHubEmitter emitter;
-  private Partition partition;
-  private TridentCollectorMock collectorMock;
-  private final int batchSize = 32;
+    private final int batchSize = 32;
+    private TransactionalTridentEventHubEmitter emitter;
+    private Partition partition;
+    private TridentCollectorMock collectorMock;
+
+    @Before
+    public void setUp() throws Exception {
+        EventHubSpoutConfig conf = new EventHubSpoutConfig("username", 
"password",
+                                                           "namespace", 
"entityname", 16, "zookeeper");
+        conf.setTopologyName("TestTopo");
+        IEventHubReceiverFactory recvFactory = new IEventHubReceiverFactory() {
+            @Override
+            public IEventHubReceiver create(EventHubSpoutConfig config,
+                                            String partitionId) {
+                return new EventHubReceiverMock(partitionId);
+            }
+        };
+        partition = new Partition(conf, "0");
+        emitter = new TransactionalTridentEventHubEmitter(conf, batchSize, 
null, recvFactory);
+        collectorMock = new TridentCollectorMock();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        emitter.close();
+        emitter = null;
+    }
+
+    @Test
+    public void testEmitInSequence() {
+        //test the happy path, emit batches in sequence
+        Map<String, Object> meta = emitter.emitPartitionBatchNew(null, 
collectorMock, partition, null);
+        String collected = collectorMock.getBuffer();
+        assertTrue(collected.startsWith("message" + 0));
+        //System.out.println("collected: " + collected);
+        collectorMock.clear();
 
-  @Before
-  public void setUp() throws Exception {
-    EventHubSpoutConfig conf = new EventHubSpoutConfig("username", "password",
-        "namespace", "entityname", 16, "zookeeper");
-    conf.setTopologyName("TestTopo");
-    IEventHubReceiverFactory recvFactory = new IEventHubReceiverFactory() {
-      @Override
-      public IEventHubReceiver create(EventHubSpoutConfig config,
-          String partitionId) {
-        return new EventHubReceiverMock(partitionId);
-      }
-    };
-    partition = new Partition(conf, "0");
-    emitter = new TransactionalTridentEventHubEmitter(conf, batchSize, null, 
recvFactory);
-    collectorMock = new TridentCollectorMock();
-  }
+        emitter.emitPartitionBatchNew(null, collectorMock, partition, meta);
+        collected = collectorMock.getBuffer();
+        //System.out.println("collected: " + collected);
+        assertTrue(collected.startsWith("message" + batchSize));
+    }
 
-  @After
-  public void tearDown() throws Exception {
-    emitter.close();
-    emitter = null;
-  }
+    @Test
+    public void testReEmit() {
+        //test we can re-emit the second batch
+        Map<String, Object> meta = emitter.emitPartitionBatchNew(null, 
collectorMock, partition, null);
+        collectorMock.clear();
 
-  @Test
-  public void testEmitInSequence() {
-    //test the happy path, emit batches in sequence
-    Map<String, Object> meta = emitter.emitPartitionBatchNew(null, 
collectorMock, partition, null);
-    String collected = collectorMock.getBuffer();
-    assertTrue(collected.startsWith("message"+0));
-    //System.out.println("collected: " + collected);
-    collectorMock.clear();
-    
-    emitter.emitPartitionBatchNew(null, collectorMock, partition, meta);
-    collected = collectorMock.getBuffer();
-    //System.out.println("collected: " + collected);
-    assertTrue(collected.startsWith("message"+batchSize));
-  }
+        //emit second batch
+        Map<String, Object> meta1 = emitter.emitPartitionBatchNew(null, 
collectorMock, partition, meta);
+        String collected0 = collectorMock.getBuffer();
+        collectorMock.clear();
 
-  @Test
-  public void testReEmit() {
-    //test we can re-emit the second batch
-    Map<String, Object> meta = emitter.emitPartitionBatchNew(null, 
collectorMock, partition, null);
-    collectorMock.clear();
-    
-    //emit second batch
-    Map<String, Object> meta1 = emitter.emitPartitionBatchNew(null, 
collectorMock, partition, meta);
-    String collected0 = collectorMock.getBuffer();
-    collectorMock.clear();
-    
-    //re-emit second batch
-    emitter.emitPartitionBatch(null, collectorMock, partition, meta1);
-    String collected1 = collectorMock.getBuffer();
-    assertTrue(collected0.equals(collected1));
-  }
+        //re-emit second batch
+        emitter.emitPartitionBatch(null, collectorMock, partition, meta1);
+        String collected1 = collectorMock.getBuffer();
+        assertTrue(collected0.equals(collected1));
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
index bd5b07f..59f7002 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/trident/TridentCollectorMock.java
@@ -15,43 +15,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.trident;
 
 import java.util.List;
-
 import org.apache.storm.trident.operation.TridentCollector;
 
 /**
  * A mock of TridentCollector
  */
 public class TridentCollectorMock implements TridentCollector {
-  StringBuilder buffer;
-  
-  public TridentCollectorMock() {
-    buffer = new StringBuilder();
-  }
-  
-  @Override
-  public void emit(List<Object> tuples) {
-    for(Object o: tuples) {
-      buffer.append(o.toString());
+    StringBuilder buffer;
+
+    public TridentCollectorMock() {
+        buffer = new StringBuilder();
+    }
+
+    @Override
+    public void emit(List<Object> tuples) {
+        for (Object o : tuples) {
+            buffer.append(o.toString());
+        }
+    }
+
+    @Override
+    public void flush() {
+        // NO-OP
+    }
+
+    @Override
+    public void reportError(Throwable arg0) {
+    }
+
+    public void clear() {
+        buffer.setLength(0);
+    }
+
+    public String getBuffer() {
+        return buffer.toString();
     }
-  }
-
-  @Override
-  public void flush() {
-    // NO-OP
-  }
-
-  @Override
-  public void reportError(Throwable arg0) {
-  }
-
-  public void clear() {
-    buffer.setLength(0);
-  }
-  
-  public String getBuffer() {
-    return buffer.toString();
-  }
 }

Reply via email to