http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
index a4901ae..d25ebdd 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFactory.java
@@ -1,23 +1,18 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- 
*******************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
 /**
- * 
+ *
  */
+
 package org.apache.storm.eventhubs.spout;
 
 import java.io.Serializable;
@@ -26,5 +21,5 @@ import java.io.Serializable;
  * An abstract factory to generate EventHubReceiver
  */
 public interface IEventHubReceiverFactory extends Serializable {
-  IEventHubReceiver create(EventHubSpoutConfig config, String partitionId);
+    IEventHubReceiver create(EventHubSpoutConfig config, String partitionId);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
index e99f20a..d83c0cb 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionCoordinator.java
@@ -15,13 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import java.util.List;
 
 public interface IPartitionCoordinator {
 
-  List<IPartitionManager> getMyPartitionManagers();
+    List<IPartitionManager> getMyPartitionManagers();
 
-  IPartitionManager getPartitionManager(String partitionId);
+    IPartitionManager getPartitionManager(String partitionId);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
index d391edd..345fd48 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java
@@ -15,23 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import java.util.Map;
 
 public interface IPartitionManager {
 
-  void open() throws Exception;
+    void open() throws Exception;
+
+    void close();
 
-  void close();
+    EventDataWrap receive();
 
-  EventDataWrap receive();
+    void checkpoint();
 
-  void checkpoint();
+    void ack(String offset);
 
-  void ack(String offset);
+    void fail(String offset);
 
-  void fail(String offset);
-  
-  Map<String, Object> getMetricsData();
+    Map<String, Object> getMetricsData();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java
index dc136eb..c3ce2ec 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManagerFactory.java
@@ -1,23 +1,18 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- 
*******************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
 /**
- * 
+ *
  */
+
 package org.apache.storm.eventhubs.spout;
 
 import java.io.Serializable;
@@ -26,8 +21,8 @@ import java.io.Serializable;
  * An interface of factory method to create IPartitionManager
  */
 public interface IPartitionManagerFactory extends Serializable {
-  IPartitionManager create(EventHubSpoutConfig spoutConfig,
-      String partitionId,
-      IStateStore stateStore,
-      IEventHubReceiver receiver);
+    IPartitionManager create(EventHubSpoutConfig spoutConfig,
+                             String partitionId,
+                             IStateStore stateStore,
+                             IEventHubReceiver receiver);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
index 03c7ae8..311ceb7 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IStateStore.java
@@ -15,17 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import java.io.Serializable;
 
 public interface IStateStore extends Serializable {
 
-  public void open();
+    public void open();
 
-  public void close();
+    public void close();
 
-  public void saveData(String path, String data);
+    public void saveData(String path, String data);
 
-  public String readData(String path);
+    public String readData(String path);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
index 59d5c71..0bb1f10 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/MessageId.java
@@ -15,42 +15,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 public class MessageId {
 
-  private final String partitionId;
-  private final String offset;
-  private final long sequenceNumber;
-
-  public MessageId(
-    String partitionId,
-    String offset,
-    long sequenceNumber) {
-    this.partitionId = partitionId;
-    this.offset = offset;
-    this.sequenceNumber = sequenceNumber;
-  }
-
-  public static MessageId create(String partitionId, String offset, long 
sequenceNumber) {
-    return new MessageId(partitionId, offset, sequenceNumber);
-  }
-
-  public String getPartitionId() {
-    return this.partitionId;
-  }
-
-  public String getOffset() {
-    return this.offset;
-  }
-
-  public Long getSequenceNumber() {
-    return this.sequenceNumber;
-  }
-  
-  @Override
-  public String toString() {
-    return String.format("PartitionId: %s, Offset: %s, SequenceNumber: %s",
-      this.partitionId, this.offset, this.sequenceNumber);
-  }
+    private final String partitionId;
+    private final String offset;
+    private final long sequenceNumber;
+
+    public MessageId(
+        String partitionId,
+        String offset,
+        long sequenceNumber) {
+        this.partitionId = partitionId;
+        this.offset = offset;
+        this.sequenceNumber = sequenceNumber;
+    }
+
+    public static MessageId create(String partitionId, String offset, long 
sequenceNumber) {
+        return new MessageId(partitionId, offset, sequenceNumber);
+    }
+
+    public String getPartitionId() {
+        return this.partitionId;
+    }
+
+    public String getOffset() {
+        return this.offset;
+    }
+
+    public Long getSequenceNumber() {
+        return this.sequenceNumber;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("PartitionId: %s, Offset: %s, SequenceNumber: %s",
+                             this.partitionId, this.offset, 
this.sequenceNumber);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
index 20e021a..c5c5084 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java
@@ -15,87 +15,87 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
-package org.apache.storm.eventhubs.spout;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.storm.eventhubs.spout;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.TreeSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PartitionManager extends SimplePartitionManager {
-  private static final Logger logger = 
LoggerFactory.getLogger(PartitionManager.class);
-  private final int ehReceiveTimeoutMs = 5000;
+    private static final Logger logger = 
LoggerFactory.getLogger(PartitionManager.class);
+    private final int ehReceiveTimeoutMs = 5000;
 
-  //all sent events are stored in pending
-  private final Map<String, EventDataWrap> pending;
-  //all failed events are put in toResend, which is sorted by event's offset
-  private final TreeSet<EventDataWrap> toResend;
+    //all sent events are stored in pending
+    private final Map<String, EventDataWrap> pending;
+    //all failed events are put in toResend, which is sorted by event's offset
+    private final TreeSet<EventDataWrap> toResend;
 
-  public PartitionManager(
-    EventHubSpoutConfig spoutConfig,
-    String partitionId,
-    IStateStore stateStore,
-    IEventHubReceiver receiver) {
+    public PartitionManager(
+        EventHubSpoutConfig spoutConfig,
+        String partitionId,
+        IStateStore stateStore,
+        IEventHubReceiver receiver) {
 
-    super(spoutConfig, partitionId, stateStore, receiver);
-    
-    this.pending = new LinkedHashMap<String, EventDataWrap>();
-    this.toResend = new TreeSet<EventDataWrap>();
-  }
+        super(spoutConfig, partitionId, stateStore, receiver);
 
-  @Override
-  public EventDataWrap receive() {
-    if(pending.size() >= config.getMaxPendingMsgsPerPartition()) {
-      return null;
+        this.pending = new LinkedHashMap<String, EventDataWrap>();
+        this.toResend = new TreeSet<EventDataWrap>();
     }
 
-    EventDataWrap eventDatawrap;
-    if (toResend.isEmpty()) {
-      eventDatawrap = receiver.receive();
-    } else {
-      eventDatawrap = toResend.pollFirst();
-    }
+    @Override
+    public EventDataWrap receive() {
+        if (pending.size() >= config.getMaxPendingMsgsPerPartition()) {
+            return null;
+        }
 
-    if (eventDatawrap != null) {
-      lastOffset = eventDatawrap.getMessageId().getOffset();
-      pending.put(lastOffset, eventDatawrap);
-    }
+        EventDataWrap eventDatawrap;
+        if (toResend.isEmpty()) {
+            eventDatawrap = receiver.receive();
+        } else {
+            eventDatawrap = toResend.pollFirst();
+        }
 
-    return eventDatawrap;
-  }
+        if (eventDatawrap != null) {
+            lastOffset = eventDatawrap.getMessageId().getOffset();
+            pending.put(lastOffset, eventDatawrap);
+        }
 
-  @Override
-  public void ack(String offset) {
-    pending.remove(offset);
-  }
+        return eventDatawrap;
+    }
 
-  @Override
-  public void fail(String offset) {
-    logger.warn("fail on " + offset);
-    EventDataWrap eventDataWrap = pending.remove(offset);
-    toResend.add(eventDataWrap);
-  }
-  
-  @Override
-  protected String getCompletedOffset() {
-    String offset = null;
-    
-    if(pending.size() > 0) {
-      //find the smallest offset in pending list
-      offset = pending.keySet().iterator().next();
+    @Override
+    public void ack(String offset) {
+        pending.remove(offset);
     }
-    if(toResend.size() > 0) {
-      //find the smallest offset in toResend list
-      String offset2 = toResend.first().getMessageId().getOffset();
-      if(offset == null || offset2.compareTo(offset) < 0) {
-        offset = offset2;
-      }
+
+    @Override
+    public void fail(String offset) {
+        logger.warn("fail on " + offset);
+        EventDataWrap eventDataWrap = pending.remove(offset);
+        toResend.add(eventDataWrap);
     }
-    if(offset == null) {
-      offset = lastOffset;
+
+    @Override
+    protected String getCompletedOffset() {
+        String offset = null;
+
+        if (pending.size() > 0) {
+            //find the smallest offset in pending list
+            offset = pending.keySet().iterator().next();
+        }
+        if (toResend.size() > 0) {
+            //find the smallest offset in toResend list
+            String offset2 = toResend.first().getMessageId().getOffset();
+            if (offset == null || offset2.compareTo(offset) < 0) {
+                offset = offset2;
+            }
+        }
+        if (offset == null) {
+            offset = lastOffset;
+        }
+        return offset;
     }
-    return offset;
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
index 25c4261..436aabe 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
@@ -15,119 +15,119 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
+import java.time.Instant;
+import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.time.Instant;
 
-import java.util.Map;
 /**
  * A simple partition manager that does not re-send failed messages
  */
 public class SimplePartitionManager implements IPartitionManager {
-  private static final Logger logger = 
LoggerFactory.getLogger(SimplePartitionManager.class);
-  private static final String statePathPrefix = "/eventhubspout";
-
-  protected final IEventHubReceiver receiver;
-  protected String lastOffset = "-1";
-  protected String committedOffset = "-1";
-  
-  protected final EventHubSpoutConfig config;
-  private final String partitionId;
-  private final IStateStore stateStore;
-  private final String statePath;
-  
-  public SimplePartitionManager(
-      EventHubSpoutConfig spoutConfig,
-      String partitionId,
-      IStateStore stateStore,
-      IEventHubReceiver receiver) {
-    this.receiver = receiver;
-    this.config = spoutConfig;
-    this.partitionId = partitionId;
-    this.statePath = this.getPartitionStatePath();
-    this.stateStore = stateStore;
-  }
-  
-  @Override
-  public void open() throws Exception {
-
-    //read from state store, if not found, use startingOffset
-    String offset = stateStore.readData(statePath);
-    logger.info("read offset from state store: " + offset);
-    if(offset == null) {
-      offset = FieldConstants.DefaultStartingOffset;
+    private static final Logger logger = 
LoggerFactory.getLogger(SimplePartitionManager.class);
+    private static final String statePathPrefix = "/eventhubspout";
+
+    protected final IEventHubReceiver receiver;
+    protected final EventHubSpoutConfig config;
+    private final String partitionId;
+    private final IStateStore stateStore;
+    private final String statePath;
+    protected String lastOffset = "-1";
+    protected String committedOffset = "-1";
+
+    public SimplePartitionManager(
+        EventHubSpoutConfig spoutConfig,
+        String partitionId,
+        IStateStore stateStore,
+        IEventHubReceiver receiver) {
+        this.receiver = receiver;
+        this.config = spoutConfig;
+        this.partitionId = partitionId;
+        this.statePath = this.getPartitionStatePath();
+        this.stateStore = stateStore;
+    }
+
+    @Override
+    public void open() throws Exception {
+
+        //read from state store, if not found, use startingOffset
+        String offset = stateStore.readData(statePath);
+        logger.info("read offset from state store: " + offset);
+        if (offset == null) {
+            offset = FieldConstants.DefaultStartingOffset;
+        }
+        IEventFilter filter;
+        if (offset.equals(FieldConstants.DefaultStartingOffset)
+            && config.getEnqueueTimeFilter() != 0) {
+            filter = new 
EventHubFilter(Instant.ofEpochMilli(config.getEnqueueTimeFilter()));
+        } else {
+            filter = new EventHubFilter(offset);
+        }
+
+        receiver.open(filter);
+    }
+
+    @Override
+    public void close() {
+        this.receiver.close();
+        this.checkpoint();
     }
-    IEventFilter filter;
-    if (offset.equals(FieldConstants.DefaultStartingOffset)
-        && config.getEnqueueTimeFilter() != 0) {
-      filter = new 
EventHubFilter(Instant.ofEpochMilli(config.getEnqueueTimeFilter()));
+
+    @Override
+    public void checkpoint() {
+        String completedOffset = getCompletedOffset();
+        if (!committedOffset.equals(completedOffset)) {
+            logger.info("saving state " + completedOffset);
+            stateStore.saveData(statePath, completedOffset);
+            committedOffset = completedOffset;
+        }
+    }
+
+    protected String getCompletedOffset() {
+        return lastOffset;
     }
-    else{
-      filter = new EventHubFilter(offset);
+
+    @Override
+    public EventDataWrap receive() {
+        EventDataWrap eventDatawrap = receiver.receive();
+        if (eventDatawrap != null) {
+            lastOffset = 
eventDatawrap.getEventData().getSystemProperties().getOffset();
+        }
+        return eventDatawrap;
     }
 
-    receiver.open(filter);
-  }
-  
-  @Override
-  public void close() {
-    this.receiver.close();
-    this.checkpoint();
-  }
-  
-  @Override
-  public void checkpoint() {
-    String completedOffset = getCompletedOffset();
-    if(!committedOffset.equals(completedOffset)) {
-      logger.info("saving state " + completedOffset);
-      stateStore.saveData(statePath, completedOffset);
-      committedOffset = completedOffset;
+    @Override
+    public void ack(String offset) {
+        //do nothing
     }
-  }
-  
-  protected String getCompletedOffset() {
-    return lastOffset;
-  }
-
-  @Override
-  public EventDataWrap receive() {
-    EventDataWrap eventDatawrap = receiver.receive();
-    if (eventDatawrap != null) {
-      lastOffset = 
eventDatawrap.getEventData().getSystemProperties().getOffset();
+
+    @Override
+    public void fail(String offset) {
+        logger.warn("fail on " + offset);
+        //do nothing
+    }
+
+    private String getPartitionStatePath() {
+
+        // Partition state path =
+        // 
"/{prefix}/{topologyName}/{namespace}/{entityPath}/partitions/{partitionId}/state";
+        String namespace = config.getNamespace();
+        String entityPath = config.getEntityPath();
+        String topologyName = config.getTopologyName();
+
+        String partitionStatePath =
+            statePathPrefix + "/" + topologyName + "/" + namespace + "/" + 
entityPath + "/partitions/" + this.partitionId;
+
+        logger.info("partition state path: " + partitionStatePath);
+
+        return partitionStatePath;
+    }
+
+    @Override
+    public Map<String, Object> getMetricsData() {
+        return receiver.getMetricsData();
     }
-    return eventDatawrap;
-  }
-
-  @Override
-  public void ack(String offset) {
-    //do nothing
-  }
-
-  @Override
-  public void fail(String offset) {
-    logger.warn("fail on " + offset);
-    //do nothing
-  }
-  
-  private String getPartitionStatePath() {
-
-    // Partition state path = 
-    // 
"/{prefix}/{topologyName}/{namespace}/{entityPath}/partitions/{partitionId}/state";
-    String namespace = config.getNamespace();
-    String entityPath = config.getEntityPath();
-    String topologyName = config.getTopologyName();
-
-    String partitionStatePath = statePathPrefix + "/" + topologyName + "/" + 
namespace + "/" + entityPath + "/partitions/" + this.partitionId;
-
-    logger.info("partition state path: " + partitionStatePath);
-
-    return partitionStatePath;
-  }
-  
-  @Override
-  public Map<String, Object> getMetricsData() {
-    return receiver.getMetricsData();
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
index abbecac..54c0f7d 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
@@ -15,70 +15,70 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class StaticPartitionCoordinator implements IPartitionCoordinator {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(StaticPartitionCoordinator.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(StaticPartitionCoordinator.class);
 
-  protected final EventHubSpoutConfig config;
-  protected final int taskIndex;
-  protected final int totalTasks;
-  protected final List<IPartitionManager> partitionManagers;
-  protected final Map<String, IPartitionManager> partitionManagerMap;
-  protected final IStateStore stateStore;
+    protected final EventHubSpoutConfig config;
+    protected final int taskIndex;
+    protected final int totalTasks;
+    protected final List<IPartitionManager> partitionManagers;
+    protected final Map<String, IPartitionManager> partitionManagerMap;
+    protected final IStateStore stateStore;
 
-  public StaticPartitionCoordinator(
-    EventHubSpoutConfig spoutConfig,
-    int taskIndex,
-    int totalTasks,
-    IStateStore stateStore,
-    IPartitionManagerFactory pmFactory,
-    IEventHubReceiverFactory recvFactory) {
+    public StaticPartitionCoordinator(
+        EventHubSpoutConfig spoutConfig,
+        int taskIndex,
+        int totalTasks,
+        IStateStore stateStore,
+        IPartitionManagerFactory pmFactory,
+        IEventHubReceiverFactory recvFactory) {
 
-    this.config = spoutConfig;
-    this.taskIndex = taskIndex;
-    this.totalTasks = totalTasks;
-    this.stateStore = stateStore;
-    List<String> partitionIds = calculateParititionIdsToOwn();
-    partitionManagerMap = new HashMap<String, IPartitionManager>();
-    partitionManagers = new ArrayList<IPartitionManager>();
-    
-    for (String partitionId : partitionIds) {
-      IEventHubReceiver receiver = recvFactory.create(config, partitionId);
-      IPartitionManager partitionManager = pmFactory.create(
-          config, partitionId, stateStore, receiver);
-      partitionManagerMap.put(partitionId, partitionManager);
-      partitionManagers.add(partitionManager);
-    }
-  }
+        this.config = spoutConfig;
+        this.taskIndex = taskIndex;
+        this.totalTasks = totalTasks;
+        this.stateStore = stateStore;
+        List<String> partitionIds = calculateParititionIdsToOwn();
+        partitionManagerMap = new HashMap<String, IPartitionManager>();
+        partitionManagers = new ArrayList<IPartitionManager>();
 
-  @Override
-  public List<IPartitionManager> getMyPartitionManagers() {
-    return partitionManagers;
-  }
+        for (String partitionId : partitionIds) {
+            IEventHubReceiver receiver = recvFactory.create(config, 
partitionId);
+            IPartitionManager partitionManager = pmFactory.create(
+                config, partitionId, stateStore, receiver);
+            partitionManagerMap.put(partitionId, partitionManager);
+            partitionManagers.add(partitionManager);
+        }
+    }
 
-  @Override
-  public IPartitionManager getPartitionManager(String partitionId) {
-    return partitionManagerMap.get(partitionId);
-  }
+    @Override
+    public List<IPartitionManager> getMyPartitionManagers() {
+        return partitionManagers;
+    }
 
-  protected List<String> calculateParititionIdsToOwn() {
-    List<String> taskPartitions = new ArrayList<String>();
-    for (int i = this.taskIndex; i < config.getPartitionCount(); i += 
this.totalTasks) {
-      taskPartitions.add(Integer.toString(i));
-      logger.info(String.format("taskIndex %d owns partitionId %d.", 
this.taskIndex, i));
+    @Override
+    public IPartitionManager getPartitionManager(String partitionId) {
+        return partitionManagerMap.get(partitionId);
     }
 
-    return taskPartitions;
-  }
+    protected List<String> calculateParititionIdsToOwn() {
+        List<String> taskPartitions = new ArrayList<String>();
+        for (int i = this.taskIndex; i < config.getPartitionCount(); i += 
this.totalTasks) {
+            taskPartitions.add(Integer.toString(i));
+            logger.info(String.format("taskIndex %d owns partitionId %d.", 
this.taskIndex, i));
+        }
+
+        return taskPartitions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
index a2024bd..5b6a2b9 100644
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
@@ -15,60 +15,58 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import com.microsoft.azure.eventhubs.EventData;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.storm.tuple.Fields;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
- * An Event Data Scheme which deserializes message payload into the Strings.
- * No encoding is assumed. The receiver will need to handle parsing of the 
- * string data in appropriate encoding.
+ * An Event Data Scheme which deserializes message payload into the Strings. 
No encoding is assumed. The receiver will need to handle
+ * parsing of the string data in appropriate encoding.
+ *
+ * Note: Unlike other schemes provided, this scheme does not include any 
metadata.
  *
- * Note: Unlike other schemes provided, this scheme does not include any 
- * metadata. 
- * 
- * For metadata please refer to {@link BinaryEventDataScheme}, {@link 
EventDataScheme} 
+ * For metadata please refer to {@link BinaryEventDataScheme}, {@link 
EventDataScheme}
  */
 public class StringEventDataScheme implements IEventDataScheme {
 
-  private static final long serialVersionUID = 1L;
-  private static final Logger logger = 
LoggerFactory.getLogger(StringEventDataScheme.class);
+    private static final long serialVersionUID = 1L;
+    private static final Logger logger = 
LoggerFactory.getLogger(StringEventDataScheme.class);
 
-  @Override
-  public List<Object> deserialize(EventData eventData) {
-    final List<Object> fieldContents = new ArrayList<Object>();
-    String messageData = "";
-    if (eventData.getBytes()!=null) {
-      messageData = new String(eventData.getBytes());
-    }
-    /*Will only serialize AMQPValue type*/
-    else if (eventData.getObject()!=null) {
-      try {
-        if (!(eventData.getObject() instanceof List)) {
-          messageData = eventData.getObject().toString();
-        } else {
-          throw new RuntimeException("Cannot serialize the given AMQP type.");
+    @Override
+    public List<Object> deserialize(EventData eventData) {
+        final List<Object> fieldContents = new ArrayList<Object>();
+        String messageData = "";
+        if (eventData.getBytes() != null) {
+            messageData = new String(eventData.getBytes());
         }
-      } catch (RuntimeException e){
-        logger.error("Failed to serialize EventData payload class"
-                + eventData.getObject().getClass());
-        logger.error("Exception encountered while serializing EventData 
payload is"
-                + e.toString());
-        throw e;
-      }
+        /*Will only serialize AMQPValue type*/
+        else if (eventData.getObject() != null) {
+            try {
+                if (!(eventData.getObject() instanceof List)) {
+                    messageData = eventData.getObject().toString();
+                } else {
+                    throw new RuntimeException("Cannot serialize the given 
AMQP type.");
+                }
+            } catch (RuntimeException e) {
+                logger.error("Failed to serialize EventData payload class"
+                             + eventData.getObject().getClass());
+                logger.error("Exception encountered while serializing 
EventData payload is"
+                             + e.toString());
+                throw e;
+            }
+        }
+        fieldContents.add(messageData);
+        return fieldContents;
     }
-    fieldContents.add(messageData);
-    return fieldContents;
-  }
 
-  @Override
-  public Fields getOutputFields() {
-    return new Fields(FieldConstants.Message);
-  }
+    @Override
+    public Fields getOutputFields() {
+        return new Fields(FieldConstants.Message);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
index 063aa4d..4e2ece9 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/ZookeeperStateStore.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import org.apache.curator.RetryPolicy;
@@ -25,71 +26,71 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ZookeeperStateStore implements IStateStore {
-  private static final long serialVersionUID = 1L;
-  private static final Logger logger = 
LoggerFactory.getLogger(ZookeeperStateStore.class);
+    private static final long serialVersionUID = 1L;
+    private static final Logger logger = 
LoggerFactory.getLogger(ZookeeperStateStore.class);
 
-  private final String zookeeperConnectionString;
-  private final CuratorFramework curatorFramework;
-  
-  public ZookeeperStateStore(String zookeeperConnectionString) {
-    this(zookeeperConnectionString, 3, 100);
-  }
+    private final String zookeeperConnectionString;
+    private final CuratorFramework curatorFramework;
 
-  public ZookeeperStateStore(String connectionString, int retries, int 
retryInterval) {
-    if (connectionString == null) {
-      zookeeperConnectionString = "localhost:2181";
-    } else {
-      zookeeperConnectionString = connectionString;
+    public ZookeeperStateStore(String zookeeperConnectionString) {
+        this(zookeeperConnectionString, 3, 100);
     }
 
-    RetryPolicy retryPolicy = new RetryNTimes(retries, retryInterval);
-    curatorFramework = 
CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
-  }
+    public ZookeeperStateStore(String connectionString, int retries, int 
retryInterval) {
+        if (connectionString == null) {
+            zookeeperConnectionString = "localhost:2181";
+        } else {
+            zookeeperConnectionString = connectionString;
+        }
 
-  @Override
-  public void open() {
-    curatorFramework.start();
-  }
+        RetryPolicy retryPolicy = new RetryNTimes(retries, retryInterval);
+        curatorFramework = 
CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
+    }
 
-  @Override
-  public void close() {
-    curatorFramework.close();
-  }
+    @Override
+    public void open() {
+        curatorFramework.start();
+    }
+
+    @Override
+    public void close() {
+        curatorFramework.close();
+    }
 
-  @Override
-  public void saveData(String statePath, String data) {
-    data = data == null ? "" : data;
-    byte[] bytes = data.getBytes();
+    @Override
+    public void saveData(String statePath, String data) {
+        data = data == null ? "" : data;
+        byte[] bytes = data.getBytes();
 
-    try {
-      if (curatorFramework.checkExists().forPath(statePath) == null) {
-        curatorFramework.create().creatingParentsIfNeeded().forPath(statePath, 
bytes);
-      } else {
-        curatorFramework.setData().forPath(statePath, bytes);
-      }
+        try {
+            if (curatorFramework.checkExists().forPath(statePath) == null) {
+                
curatorFramework.create().creatingParentsIfNeeded().forPath(statePath, bytes);
+            } else {
+                curatorFramework.setData().forPath(statePath, bytes);
+            }
 
-      logger.info(String.format("data was saved. path: %s, data: %s.", 
statePath, data));
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+            logger.info(String.format("data was saved. path: %s, data: %s.", 
statePath, data));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
-  }
 
-  @Override
-  public String readData(String statePath) {
-    try {
-      if (curatorFramework.checkExists().forPath(statePath) == null) {
-        // do we want to throw an exception if path doesn't exist??
-        return null;
-      } else {
-        byte[] bytes = curatorFramework.getData().forPath(statePath);
-        String data = new String(bytes);
+    @Override
+    public String readData(String statePath) {
+        try {
+            if (curatorFramework.checkExists().forPath(statePath) == null) {
+                // do we want to throw an exception if path doesn't exist??
+                return null;
+            } else {
+                byte[] bytes = curatorFramework.getData().forPath(statePath);
+                String data = new String(bytes);
 
-        logger.info(String.format("data was retrieved. path: %s, data: %s.", 
statePath, data));
+                logger.info(String.format("data was retrieved. path: %s, data: 
%s.", statePath, data));
 
-        return data;
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+                return data;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
index 253b317..b87c81b 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Coordinator.java
@@ -15,46 +15,45 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.trident;
 
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.trident.spout.IPartitionedTridentSpout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
-import org.apache.storm.trident.spout.IPartitionedTridentSpout;
+public class Coordinator implements 
IPartitionedTridentSpout.Coordinator<Partitions>,
+                                    
IOpaquePartitionedTridentSpout.Coordinator<Partitions> {
+    private static final Logger logger = 
LoggerFactory.getLogger(Coordinator.class);
+    private final EventHubSpoutConfig spoutConfig;
+    Partitions partitions;
 
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+    public Coordinator(EventHubSpoutConfig spoutConfig) {
+        this.spoutConfig = spoutConfig;
+    }
 
-public class Coordinator implements 
IPartitionedTridentSpout.Coordinator<Partitions>,
-    IOpaquePartitionedTridentSpout.Coordinator<Partitions> {
-  private static final Logger logger = 
LoggerFactory.getLogger(Coordinator.class);
-  private final EventHubSpoutConfig spoutConfig;
-  Partitions partitions;
-  
-  public Coordinator(EventHubSpoutConfig spoutConfig) {
-    this.spoutConfig = spoutConfig;
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public Partitions getPartitionsForBatch() {
-    if(partitions != null) {
-      return partitions;
+    @Override
+    public void close() {
     }
-    
-    partitions = new Partitions();
-    for(int i=0; i<spoutConfig.getPartitionCount(); ++i) {
-      partitions.addPartition(new Partition(spoutConfig, Integer.toString(i)));
+
+    @Override
+    public Partitions getPartitionsForBatch() {
+        if (partitions != null) {
+            return partitions;
+        }
+
+        partitions = new Partitions();
+        for (int i = 0; i < spoutConfig.getPartitionCount(); ++i) {
+            partitions.addPartition(new Partition(spoutConfig, 
Integer.toString(i)));
+        }
+        logger.info("created partitions, size=" + 
spoutConfig.getPartitionCount());
+        return partitions;
+    }
+
+    @Override
+    public boolean isReady(long txid) {
+        return true;
     }
-    logger.info("created partitions, size=" + spoutConfig.getPartitionCount());
-    return partitions;
-  }
-
-  @Override
-  public boolean isReady(long txid) {
-    return true;
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
index d1e8b9e..922e334 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java
@@ -15,21 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
-package org.apache.storm.eventhubs.trident;
 
-import org.apache.storm.eventhubs.spout.EventDataWrap;
+package org.apache.storm.eventhubs.trident;
 
 import java.util.List;
+import org.apache.storm.eventhubs.spout.EventDataWrap;
 
 public interface ITridentPartitionManager {
-  boolean open(String offset);
-  void close();
-  
-  /**
-   * receive a batch of messages from EvenHub up to "count" messages
-   * @param offset the starting offset
-   * @param count max number of messages in this batch
-   * @return list of EventData, if failed to receive, return empty list
-   */
-  public List<EventDataWrap> receiveBatch(String offset, int count);
+    boolean open(String offset);
+
+    void close();
+
+    /**
+     * receive a batch of messages from EvenHub up to "count" messages
+     *
+     * @param offset the starting offset
+     * @param count  max number of messages in this batch
+     * @return list of EventData, if failed to receive, return empty list
+     */
+    public List<EventDataWrap> receiveBatch(String offset, int count);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
index 701bd46..15a83cc 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManagerFactory.java
@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.trident;
 
 import java.io.Serializable;
-
 import org.apache.storm.eventhubs.spout.IEventHubReceiver;
 
 public interface ITridentPartitionManagerFactory extends Serializable {
-  ITridentPartitionManager create(IEventHubReceiver receiver);
+    ITridentPartitionManager create(IEventHubReceiver receiver);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
index 0da421c..4020b21 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
@@ -15,54 +15,55 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.trident;
 
+import java.util.List;
+import java.util.Map;
 import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
 import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
 import org.apache.storm.trident.topology.TransactionAttempt;
 
-import java.util.List;
-import java.util.Map;
-
 /**
  * A thin wrapper of TransactionalTridentEventHubEmitter for 
OpaqueTridentEventHubSpout
  */
 public class OpaqueTridentEventHubEmitter implements 
IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> {
-  private final TransactionalTridentEventHubEmitter transactionalEmitter;
-  public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) {
-    transactionalEmitter = new 
TransactionalTridentEventHubEmitter(spoutConfig);
-  }
-  
-  public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig,
-      int batchSize,
-      ITridentPartitionManagerFactory pmFactory,
-      IEventHubReceiverFactory recvFactory) {
-    transactionalEmitter = new TransactionalTridentEventHubEmitter(spoutConfig,
-        batchSize,
-        pmFactory,
-        recvFactory);
-  }
+    private final TransactionalTridentEventHubEmitter transactionalEmitter;
+
+    public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) {
+        transactionalEmitter = new 
TransactionalTridentEventHubEmitter(spoutConfig);
+    }
+
+    public OpaqueTridentEventHubEmitter(EventHubSpoutConfig spoutConfig,
+                                        int batchSize,
+                                        ITridentPartitionManagerFactory 
pmFactory,
+                                        IEventHubReceiverFactory recvFactory) {
+        transactionalEmitter = new 
TransactionalTridentEventHubEmitter(spoutConfig,
+                                                                       
batchSize,
+                                                                       
pmFactory,
+                                                                       
recvFactory);
+    }
 
-  @Override
-  public void close() {
-    transactionalEmitter.close();
-  }
+    @Override
+    public void close() {
+        transactionalEmitter.close();
+    }
 
-  @Override
-  public Map emitPartitionBatch(TransactionAttempt attempt, TridentCollector 
collector,
-      Partition partition, Map meta) {
-    return transactionalEmitter.emitPartitionBatchNew(attempt, collector, 
partition, meta);
-  }
+    @Override
+    public Map emitPartitionBatch(TransactionAttempt attempt, TridentCollector 
collector,
+                                  Partition partition, Map meta) {
+        return transactionalEmitter.emitPartitionBatchNew(attempt, collector, 
partition, meta);
+    }
 
-  @Override
-  public List<Partition> getOrderedPartitions(Partitions partitions) {
-    return transactionalEmitter.getOrderedPartitions(partitions);
-  }
+    @Override
+    public List<Partition> getOrderedPartitions(Partitions partitions) {
+        return transactionalEmitter.getOrderedPartitions(partitions);
+    }
 
-  @Override
-  public void refreshPartitions(List<Partition> partitionList) {
-    transactionalEmitter.refreshPartitions(partitionList);
-  }
+    @Override
+    public void refreshPartitions(List<Partition> partitionList) {
+        transactionalEmitter.refreshPartitions(partitionList);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
index 7123304..f062201 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubSpout.java
@@ -15,50 +15,49 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.trident;
 
 import java.util.Map;
-
 import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
 import org.apache.storm.eventhubs.spout.IEventDataScheme;
-
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Fields;
 import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.tuple.Fields;
 
 /**
  * Opaque Trident EventHubs Spout
  */
 public class OpaqueTridentEventHubSpout implements 
IOpaquePartitionedTridentSpout<Partitions, Partition, Map> {
-  private static final long serialVersionUID = 1L;
-  private final IEventDataScheme scheme;
-  private final EventHubSpoutConfig spoutConfig;
-
-  public OpaqueTridentEventHubSpout(EventHubSpoutConfig config) {
-    spoutConfig = config;
-    scheme = spoutConfig.getEventDataScheme();
-  }
-
-  @Override
-  public Map<String, Object> getComponentConfiguration() {
-    return null;
-  }
-
-  @Override
-  public IOpaquePartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
-      Map<String, Object> conf, TopologyContext context) {
-    return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
-  }
-
-  @Override
-  public IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> 
getEmitter(
-      Map<String, Object> conf, TopologyContext context) {
-    return new OpaqueTridentEventHubEmitter(spoutConfig);
-  }
-
-  @Override
-  public Fields getOutputFields() {
-    return scheme.getOutputFields();
-  }
+    private static final long serialVersionUID = 1L;
+    private final IEventDataScheme scheme;
+    private final EventHubSpoutConfig spoutConfig;
+
+    public OpaqueTridentEventHubSpout(EventHubSpoutConfig config) {
+        spoutConfig = config;
+        scheme = spoutConfig.getEventDataScheme();
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+    @Override
+    public IOpaquePartitionedTridentSpout.Coordinator<Partitions> 
getCoordinator(
+        Map<String, Object> conf, TopologyContext context) {
+        return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
+    }
+
+    @Override
+    public IOpaquePartitionedTridentSpout.Emitter<Partitions, Partition, Map> 
getEmitter(
+        Map<String, Object> conf, TopologyContext context) {
+        return new OpaqueTridentEventHubEmitter(spoutConfig);
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return scheme.getOutputFields();
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
index 8857eec..44d9d23 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partition.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.trident;
 
 import java.io.Serializable;
@@ -25,15 +26,15 @@ import org.apache.storm.trident.spout.ISpoutPartition;
  * Represents an EventHub partition
  */
 public class Partition implements ISpoutPartition, Serializable {
-  private static final long serialVersionUID = 1L;
-  String partitionId;
-  
-  public Partition(EventHubSpoutConfig config, String partitionId) {
-    this.partitionId = partitionId;
-  }
-  
-  @Override
-  public String getId() {
-    return partitionId;
-  }
+    private static final long serialVersionUID = 1L;
+    String partitionId;
+
+    public Partition(EventHubSpoutConfig config, String partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    @Override
+    public String getId() {
+        return partitionId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
index c3317d9..375f39d 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/Partitions.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.trident;
 
 import java.io.Serializable;
@@ -25,17 +26,18 @@ import java.util.List;
  * Represents all EventHub partitions a spout is receiving messages from.
  */
 public class Partitions implements Serializable {
-  private static final long serialVersionUID = 1L;
-  private List<Partition> partitionList;
-  public Partitions() {
-    partitionList = new ArrayList<Partition>();
-  }
-  
-  public void addPartition(Partition partition) {
-    partitionList.add(partition);
-  }
-  
-  public List<Partition> getPartitions() {
-    return partitionList;
-  }
+    private static final long serialVersionUID = 1L;
+    private List<Partition> partitionList;
+
+    public Partitions() {
+        partitionList = new ArrayList<Partition>();
+    }
+
+    public void addPartition(Partition partition) {
+        partitionList.add(partition);
+    }
+
+    public List<Partition> getPartitions() {
+        return partitionList;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
index d3456e0..8bd7bb1 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
@@ -15,145 +15,149 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.trident;
 
-import org.apache.storm.eventhubs.spout.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.eventhubs.spout.EventDataWrap;
+import org.apache.storm.eventhubs.spout.EventHubReceiverImpl;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.FieldConstants;
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.spout.IPartitionedTridentSpout;
 import org.apache.storm.trident.topology.TransactionAttempt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 
 public class TransactionalTridentEventHubEmitter
     implements IPartitionedTridentSpout.Emitter<Partitions, Partition, 
Map<String, Object>> {
-  private static final Logger logger = 
LoggerFactory.getLogger(TransactionalTridentEventHubEmitter.class);
-  private final int batchSize; 
-  private final EventHubSpoutConfig spoutConfig;
-  private Map<String, ITridentPartitionManager> pmMap;
-  private ITridentPartitionManagerFactory pmFactory;
-  private IEventHubReceiverFactory recvFactory;
-
-  public TransactionalTridentEventHubEmitter(EventHubSpoutConfig spoutConfig) {
-    //use batch size that matches the default credit size
-    this(spoutConfig, spoutConfig.getReceiverCredits(), null, null);
-  }
-      
-  public TransactionalTridentEventHubEmitter(final EventHubSpoutConfig 
spoutConfig,
-      int batchSize,
-      ITridentPartitionManagerFactory pmFactory,
-      IEventHubReceiverFactory recvFactory) {
-    this.spoutConfig = spoutConfig;
-    this.batchSize = batchSize;
-    this.pmFactory = pmFactory;
-    this.recvFactory = recvFactory;
-    pmMap = new HashMap<String, ITridentPartitionManager>();
-    if(this.pmFactory == null) {
-      this.pmFactory = new ITridentPartitionManagerFactory() {
-        @Override
-        public ITridentPartitionManager create(IEventHubReceiver receiver) {
-          return new TridentPartitionManager(spoutConfig, receiver);
-        }
-      };
+    private static final Logger logger = 
LoggerFactory.getLogger(TransactionalTridentEventHubEmitter.class);
+    private final int batchSize;
+    private final EventHubSpoutConfig spoutConfig;
+    private Map<String, ITridentPartitionManager> pmMap;
+    private ITridentPartitionManagerFactory pmFactory;
+    private IEventHubReceiverFactory recvFactory;
+
+    public TransactionalTridentEventHubEmitter(EventHubSpoutConfig 
spoutConfig) {
+        //use batch size that matches the default credit size
+        this(spoutConfig, spoutConfig.getReceiverCredits(), null, null);
     }
-    if(this.recvFactory == null) {
-      this.recvFactory = new IEventHubReceiverFactory() {
-        @Override
-        public IEventHubReceiver create(EventHubSpoutConfig config,
-            String partitionId) {
-          return new EventHubReceiverImpl(config, partitionId);
+
+    public TransactionalTridentEventHubEmitter(final EventHubSpoutConfig 
spoutConfig,
+                                               int batchSize,
+                                               ITridentPartitionManagerFactory 
pmFactory,
+                                               IEventHubReceiverFactory 
recvFactory) {
+        this.spoutConfig = spoutConfig;
+        this.batchSize = batchSize;
+        this.pmFactory = pmFactory;
+        this.recvFactory = recvFactory;
+        pmMap = new HashMap<String, ITridentPartitionManager>();
+        if (this.pmFactory == null) {
+            this.pmFactory = new ITridentPartitionManagerFactory() {
+                @Override
+                public ITridentPartitionManager create(IEventHubReceiver 
receiver) {
+                    return new TridentPartitionManager(spoutConfig, receiver);
+                }
+            };
+        }
+        if (this.recvFactory == null) {
+            this.recvFactory = new IEventHubReceiverFactory() {
+                @Override
+                public IEventHubReceiver create(EventHubSpoutConfig config,
+                                                String partitionId) {
+                    return new EventHubReceiverImpl(config, partitionId);
+                }
+            };
         }
-      };
-    }
-  }
-  
-  @Override
-  public void close() {
-    for(ITridentPartitionManager pm: pmMap.values()) {
-      pm.close();
-    }
-  }
-  
-  /**
-   * Check if partition manager for a given partiton is created
-   * if not, create it.
-   * @param partition
-   */
-  private ITridentPartitionManager getOrCreatePartitionManager(Partition 
partition) {
-    ITridentPartitionManager pm;
-    if(!pmMap.containsKey(partition.getId())) {
-      IEventHubReceiver receiver = recvFactory.create(spoutConfig, 
partition.getId());
-      pm = pmFactory.create(receiver);
-      pmMap.put(partition.getId(), pm);
     }
-    else {
-      pm = pmMap.get(partition.getId());
+
+    @Override
+    public void close() {
+        for (ITridentPartitionManager pm : pmMap.values()) {
+            pm.close();
+        }
     }
-    return pm;
-  }
-
-  @Override
-  public void emitPartitionBatch(TransactionAttempt attempt,
-      TridentCollector collector, Partition partition, Map<String, Object> 
meta) {
-    String offset = (String)meta.get("offset");
-    int count = Integer.parseInt((String)meta.get("count"));
-    logger.info("re-emit for partition " + partition.getId() + ", offset=" + 
offset + ", count=" + count);
-    ITridentPartitionManager pm = getOrCreatePartitionManager(partition);
-    List<EventDataWrap> listEvents = pm.receiveBatch(offset, count);
-    if(listEvents.size() != count) {
-      logger.error("failed to refetch eventhub messages, new count=" + 
listEvents.size());
-      return;
+
+    /**
+     * Check if partition manager for a given partiton is created if not, 
create it.
+     *
+     * @param partition
+     */
+    private ITridentPartitionManager getOrCreatePartitionManager(Partition 
partition) {
+        ITridentPartitionManager pm;
+        if (!pmMap.containsKey(partition.getId())) {
+            IEventHubReceiver receiver = recvFactory.create(spoutConfig, 
partition.getId());
+            pm = pmFactory.create(receiver);
+            pmMap.put(partition.getId(), pm);
+        } else {
+            pm = pmMap.get(partition.getId());
+        }
+        return pm;
     }
 
-    for(EventDataWrap ed: listEvents) {
-      List<Object> tuples = 
-          spoutConfig.getEventDataScheme().deserialize(ed.getEventData());
-      collector.emit(tuples);
+    @Override
+    public void emitPartitionBatch(TransactionAttempt attempt,
+                                   TridentCollector collector, Partition 
partition, Map<String, Object> meta) {
+        String offset = (String) meta.get("offset");
+        int count = Integer.parseInt((String) meta.get("count"));
+        logger.info("re-emit for partition " + partition.getId() + ", offset=" 
+ offset + ", count=" + count);
+        ITridentPartitionManager pm = getOrCreatePartitionManager(partition);
+        List<EventDataWrap> listEvents = pm.receiveBatch(offset, count);
+        if (listEvents.size() != count) {
+            logger.error("failed to refetch eventhub messages, new count=" + 
listEvents.size());
+            return;
+        }
+
+        for (EventDataWrap ed : listEvents) {
+            List<Object> tuples =
+                
spoutConfig.getEventDataScheme().deserialize(ed.getEventData());
+            collector.emit(tuples);
+        }
     }
-  }
-
-  @Override
-  public Map<String, Object> emitPartitionBatchNew(TransactionAttempt attempt,
-      TridentCollector collector, Partition partition, Map<String, Object> 
meta) {
-    ITridentPartitionManager pm = getOrCreatePartitionManager(partition);
-    String offset = FieldConstants.DefaultStartingOffset;
-    if(meta != null && meta.containsKey("nextOffset")) {
-      offset = (String)meta.get("nextOffset");
+
+    @Override
+    public Map<String, Object> emitPartitionBatchNew(TransactionAttempt 
attempt,
+                                                     TridentCollector 
collector, Partition partition, Map<String, Object> meta) {
+        ITridentPartitionManager pm = getOrCreatePartitionManager(partition);
+        String offset = FieldConstants.DefaultStartingOffset;
+        if (meta != null && meta.containsKey("nextOffset")) {
+            offset = (String) meta.get("nextOffset");
+        }
+        //logger.info("emit for partition " + partition.getId() + ", offset=" 
+ offset);
+        String nextOffset = offset;
+
+        List<EventDataWrap> listEvents = pm.receiveBatch(offset, batchSize);
+
+        for (EventDataWrap ed : listEvents) {
+            //update nextOffset;
+            nextOffset = ed.getMessageId().getOffset();
+            List<Object> tuples =
+                
spoutConfig.getEventDataScheme().deserialize(ed.getEventData());
+            collector.emit(tuples);
+        }
+        //logger.info("emitted new batches: " + listEvents.size());
+
+        Map<String, Object> newMeta = new HashMap<>();
+        newMeta.put("offset", offset);
+        newMeta.put("nextOffset", nextOffset);
+        newMeta.put("count", "" + listEvents.size());
+        return newMeta;
     }
-    //logger.info("emit for partition " + partition.getId() + ", offset=" + 
offset);
-    String nextOffset = offset;
 
-    List<EventDataWrap> listEvents = pm.receiveBatch(offset, batchSize);
+    @Override
+    public List<Partition> getOrderedPartitions(Partitions partitions) {
+        return partitions.getPartitions();
+    }
 
-    for(EventDataWrap ed: listEvents) {
-      //update nextOffset;
-      nextOffset = ed.getMessageId().getOffset();
-      List<Object> tuples = 
-          spoutConfig.getEventDataScheme().deserialize(ed.getEventData());
-      collector.emit(tuples);
+    @Override
+    public void refreshPartitions(List<Partition> partitionList) {
+        //partition info does not change in EventHub
+        return;
     }
-    //logger.info("emitted new batches: " + listEvents.size());
-    
-    Map<String, Object> newMeta = new HashMap<>();
-    newMeta.put("offset", offset);
-    newMeta.put("nextOffset", nextOffset);
-    newMeta.put("count", ""+listEvents.size());
-    return newMeta;
-  }
-
-  @Override
-  public List<Partition> getOrderedPartitions(Partitions partitions) {
-    return partitions.getPartitions();
-  }
-
-  @Override
-  public void refreshPartitions(List<Partition> partitionList) {
-    //partition info does not change in EventHub
-    return;
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
index ee3242a..a2eb73d 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubSpout.java
@@ -15,52 +15,50 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.trident;
 
 import java.util.Map;
-
 import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
 import org.apache.storm.eventhubs.spout.IEventDataScheme;
-
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Fields;
 import org.apache.storm.trident.spout.IPartitionedTridentSpout;
-import org.apache.storm.eventhubs.trident.Partition;
+import org.apache.storm.tuple.Fields;
 
 /**
  * Transactional Trident EventHub Spout
  */
-public class TransactionalTridentEventHubSpout implements 
-  IPartitionedTridentSpout<Partitions, Partition, Map<String, Object>> {
-  private static final long serialVersionUID = 1L;
-  private final IEventDataScheme scheme;
-  private final EventHubSpoutConfig spoutConfig;
-  
-  public TransactionalTridentEventHubSpout(EventHubSpoutConfig config) {
-    spoutConfig = config;
-    scheme = spoutConfig.getEventDataScheme();
-  }
-  
-  @Override
-  public Map<String, Object> getComponentConfiguration() {
-    return null;
-  }
+public class TransactionalTridentEventHubSpout implements
+                                               
IPartitionedTridentSpout<Partitions, Partition, Map<String, Object>> {
+    private static final long serialVersionUID = 1L;
+    private final IEventDataScheme scheme;
+    private final EventHubSpoutConfig spoutConfig;
+
+    public TransactionalTridentEventHubSpout(EventHubSpoutConfig config) {
+        spoutConfig = config;
+        scheme = spoutConfig.getEventDataScheme();
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
 
-  @Override
-  public IPartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
-      Map<String, Object> conf, TopologyContext context) {
-    return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
-  }
+    @Override
+    public IPartitionedTridentSpout.Coordinator<Partitions> getCoordinator(
+        Map<String, Object> conf, TopologyContext context) {
+        return new org.apache.storm.eventhubs.trident.Coordinator(spoutConfig);
+    }
 
-  @Override
-  public IPartitionedTridentSpout.Emitter<Partitions, Partition, Map<String, 
Object>> getEmitter(
-      Map<String, Object> conf, TopologyContext context) {
-    return new TransactionalTridentEventHubEmitter(spoutConfig);
-  }
+    @Override
+    public IPartitionedTridentSpout.Emitter<Partitions, Partition, Map<String, 
Object>> getEmitter(
+        Map<String, Object> conf, TopologyContext context) {
+        return new TransactionalTridentEventHubEmitter(spoutConfig);
+    }
 
-  @Override
-  public Fields getOutputFields() {
-    return scheme.getOutputFields();
-  }
+    @Override
+    public Fields getOutputFields() {
+        return scheme.getOutputFields();
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
index a384667..550e106 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
@@ -15,72 +15,75 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
-package org.apache.storm.eventhubs.trident;
 
-import org.apache.storm.eventhubs.spout.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.storm.eventhubs.trident;
 
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.storm.eventhubs.spout.EventDataWrap;
+import org.apache.storm.eventhubs.spout.EventHubException;
+import org.apache.storm.eventhubs.spout.EventHubFilter;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import org.apache.storm.eventhubs.spout.FieldConstants;
+import org.apache.storm.eventhubs.spout.IEventHubReceiver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TridentPartitionManager implements ITridentPartitionManager {
-  private static final Logger logger = 
LoggerFactory.getLogger(TridentPartitionManager.class);
-  private final int receiveTimeoutMs = 5000;
-  private final IEventHubReceiver receiver;
-  private final EventHubSpoutConfig spoutConfig;
-  private String lastOffset = FieldConstants.DefaultStartingOffset;
-  
-  public TridentPartitionManager(EventHubSpoutConfig spoutConfig, 
IEventHubReceiver receiver) {
-    this.receiver = receiver;
-    this.spoutConfig = spoutConfig;
-  }
-  
-  @Override
-  public boolean open(String offset) {
-    try {
-      if((offset == null || 
offset.equals(FieldConstants.DefaultStartingOffset))
-        && spoutConfig.getEnqueueTimeFilter() != 0) {
-          receiver.open(new 
EventHubFilter(Instant.ofEpochMilli(spoutConfig.getEnqueueTimeFilter())));
-      }
-      else {
-        receiver.open(new EventHubFilter(offset));
-      }
-      lastOffset = offset;
-      return true;
+    private static final Logger logger = 
LoggerFactory.getLogger(TridentPartitionManager.class);
+    private final int receiveTimeoutMs = 5000;
+    private final IEventHubReceiver receiver;
+    private final EventHubSpoutConfig spoutConfig;
+    private String lastOffset = FieldConstants.DefaultStartingOffset;
+
+    public TridentPartitionManager(EventHubSpoutConfig spoutConfig, 
IEventHubReceiver receiver) {
+        this.receiver = receiver;
+        this.spoutConfig = spoutConfig;
     }
-    catch(EventHubException ex) {
-      logger.error("failed to open eventhub receiver: " + ex.getMessage());
-      return false;
+
+    @Override
+    public boolean open(String offset) {
+        try {
+            if ((offset == null || 
offset.equals(FieldConstants.DefaultStartingOffset))
+                && spoutConfig.getEnqueueTimeFilter() != 0) {
+                receiver.open(new 
EventHubFilter(Instant.ofEpochMilli(spoutConfig.getEnqueueTimeFilter())));
+            } else {
+                receiver.open(new EventHubFilter(offset));
+            }
+            lastOffset = offset;
+            return true;
+        } catch (EventHubException ex) {
+            logger.error("failed to open eventhub receiver: " + 
ex.getMessage());
+            return false;
+        }
     }
-  }
-  
-  @Override
-  public void close() {
-    receiver.close();
-  }
-  
-  @Override
-  public List<EventDataWrap> receiveBatch(String offset, int count) {
-    List<EventDataWrap> batch = new ArrayList<EventDataWrap>(count);
-    if(!offset.equals(lastOffset) || !receiver.isOpen()) {
-      //re-establish connection to eventhub servers using the right offset
-      //TBD: might be optimized with cache.
-      close();
-      if(!open(offset)) {
-        return batch;
-      }
+
+    @Override
+    public void close() {
+        receiver.close();
     }
-    
-    for(int i=0; i<count; ++i) {
-      EventDataWrap ed = receiver.receive();
-      if(ed == null) {
-        break;
-      }
-      batch.add(ed);
-      lastOffset = ed.getMessageId().getOffset();
+
+    @Override
+    public List<EventDataWrap> receiveBatch(String offset, int count) {
+        List<EventDataWrap> batch = new ArrayList<EventDataWrap>(count);
+        if (!offset.equals(lastOffset) || !receiver.isOpen()) {
+            //re-establish connection to eventhub servers using the right 
offset
+            //TBD: might be optimized with cache.
+            close();
+            if (!open(offset)) {
+                return batch;
+            }
+        }
+
+        for (int i = 0; i < count; ++i) {
+            EventDataWrap ed = receiver.receive();
+            if (ed == null) {
+                break;
+            }
+            batch.add(ed);
+            lastOffset = ed.getMessageId().getOffset();
+        }
+        return batch;
     }
-    return batch;
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
index b0dd33a..2c8ef2e 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/samples/AtMostOnceEventCount.java
@@ -15,10 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.samples;
 
 import java.io.Serializable;
-
 import org.apache.storm.eventhubs.spout.EventHubSpout;
 import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
 import org.apache.storm.eventhubs.spout.IEventHubReceiver;
@@ -28,27 +28,27 @@ import org.apache.storm.eventhubs.spout.IStateStore;
 import org.apache.storm.eventhubs.spout.SimplePartitionManager;
 
 public class AtMostOnceEventCount extends EventCount implements Serializable {
-  @Override
-  protected EventHubSpout createEventHubSpout() {
-    IPartitionManagerFactory pmFactory = new IPartitionManagerFactory() {
-      private static final long serialVersionUID = 1L;
+    public static void main(String[] args) throws Exception {
+        AtMostOnceEventCount scenario = new AtMostOnceEventCount();
+
+        scenario.runScenario(args);
+    }
 
-      @Override
-      public IPartitionManager create(EventHubSpoutConfig spoutConfig,
-          String partitionId, IStateStore stateStore,
-          IEventHubReceiver receiver) {
-        return new SimplePartitionManager(spoutConfig, partitionId,
-            stateStore, receiver);
-      }
-    };
-    EventHubSpout eventHubSpout = new EventHubSpout(
-        spoutConfig, null, pmFactory, null);
-    return eventHubSpout;
-  }
-  
-  public static void main(String[] args) throws Exception {
-    AtMostOnceEventCount scenario = new AtMostOnceEventCount();
+    @Override
+    protected EventHubSpout createEventHubSpout() {
+        IPartitionManagerFactory pmFactory = new IPartitionManagerFactory() {
+            private static final long serialVersionUID = 1L;
 
-    scenario.runScenario(args);
-  }
+            @Override
+            public IPartitionManager create(EventHubSpoutConfig spoutConfig,
+                                            String partitionId, IStateStore 
stateStore,
+                                            IEventHubReceiver receiver) {
+                return new SimplePartitionManager(spoutConfig, partitionId,
+                                                  stateStore, receiver);
+            }
+        };
+        EventHubSpout eventHubSpout = new EventHubSpout(
+            spoutConfig, null, pmFactory, null);
+        return eventHubSpout;
+    }
 }

Reply via email to