Fixing stylecheck problems with storm-eventhubs

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/18723171
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/18723171
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/18723171

Branch: refs/heads/master
Commit: 18723171612bdfe818929297378433a3c069e4e7
Parents: fc1cf09
Author: Kishor Patil <[email protected]>
Authored: Sun Apr 22 21:35:57 2018 -0400
Committer: Kishor Patil <[email protected]>
Committed: Mon Apr 23 00:22:17 2018 -0400

----------------------------------------------------------------------
 external/storm-eventhubs/pom.xml                |   2 +-
 .../eventhubs/bolt/DefaultEventDataFormat.java  |  38 +-
 .../storm/eventhubs/bolt/EventHubBolt.java      | 208 +++++----
 .../eventhubs/bolt/EventHubBoltConfig.java      | 147 +++---
 .../storm/eventhubs/bolt/IEventDataFormat.java  |   3 +-
 .../eventhubs/spout/BinaryEventDataScheme.java  |  78 ++--
 .../storm/eventhubs/spout/EventDataScheme.java  |  92 ++--
 .../storm/eventhubs/spout/EventDataWrap.java    |  51 ++-
 .../storm/eventhubs/spout/EventHubFilter.java   |  11 +-
 .../eventhubs/spout/EventHubReceiverImpl.java   | 237 +++++-----
 .../storm/eventhubs/spout/EventHubSpout.java    | 420 ++++++++---------
 .../eventhubs/spout/EventHubSpoutConfig.java    | 451 +++++++++----------
 .../storm/eventhubs/spout/FieldConstants.java   |  13 +-
 .../storm/eventhubs/spout/IEventDataScheme.java |  34 +-
 .../eventhubs/spout/IEventHubReceiver.java      |  11 +-
 .../spout/IEventHubReceiverFactory.java         |  27 +-
 .../eventhubs/spout/IPartitionCoordinator.java  |   5 +-
 .../eventhubs/spout/IPartitionManager.java      |  17 +-
 .../spout/IPartitionManagerFactory.java         |  33 +-
 .../storm/eventhubs/spout/IStateStore.java      |   9 +-
 .../apache/storm/eventhubs/spout/MessageId.java |  69 +--
 .../storm/eventhubs/spout/PartitionManager.java | 126 +++---
 .../eventhubs/spout/SimplePartitionManager.java | 200 ++++----
 .../spout/StaticPartitionCoordinator.java       |  92 ++--
 .../eventhubs/spout/StringEventDataScheme.java  |  78 ++--
 .../eventhubs/spout/ZookeeperStateStore.java    | 107 ++---
 .../storm/eventhubs/trident/Coordinator.java    |  65 ++-
 .../trident/ITridentPartitionManager.java       |  26 +-
 .../ITridentPartitionManagerFactory.java        |   4 +-
 .../trident/OpaqueTridentEventHubEmitter.java   |  69 +--
 .../trident/OpaqueTridentEventHubSpout.java     |  65 ++-
 .../storm/eventhubs/trident/Partition.java      |  23 +-
 .../storm/eventhubs/trident/Partitions.java     |  28 +-
 .../TransactionalTridentEventHubEmitter.java    | 240 +++++-----
 .../TransactionalTridentEventHubSpout.java      |  64 ++-
 .../trident/TridentPartitionManager.java        | 117 ++---
 .../eventhubs/samples/AtMostOnceEventCount.java |  44 +-
 .../storm/eventhubs/samples/EventCount.java     | 208 +++++----
 .../storm/eventhubs/samples/EventHubLoop.java   |  44 +-
 .../samples/OpaqueTridentEventCount.java        |  45 +-
 .../samples/TransactionalTridentEventCount.java |  86 ++--
 .../eventhubs/samples/bolt/GlobalCountBolt.java |  95 ++--
 .../samples/bolt/PartialCountBolt.java          |  59 ++-
 .../eventhubs/spout/EventHubReceiverMock.java   |  97 ++--
 .../spout/EventHubSpoutCallerMock.java          | 130 +++---
 .../spout/PartitionManagerCallerMock.java       | 134 +++---
 .../spout/SpoutOutputCollectorMock.java         |  76 ++--
 .../storm/eventhubs/spout/StateStoreMock.java   |  44 +-
 .../storm/eventhubs/spout/TestEventData.java    |  29 +-
 .../eventhubs/spout/TestEventHubSpout.java      |  82 ++--
 .../eventhubs/spout/TestPartitionManager.java   | 178 ++++----
 .../TestTransactionalTridentEmitter.java        | 123 +++--
 .../eventhubs/trident/TridentCollectorMock.java |  58 +--
 53 files changed, 2386 insertions(+), 2406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
index 79b16a9..0c7d3a8 100755
--- a/external/storm-eventhubs/pom.xml
+++ b/external/storm-eventhubs/pom.xml
@@ -53,7 +53,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>1765</maxAllowedViolations>
+                    <maxAllowedViolations>45</maxAllowedViolations>
                 </configuration>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
index d6e1dbc..094ab9a 100644
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java
@@ -15,33 +15,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.bolt;
 
 import org.apache.storm.tuple.Tuple;
 
 /**
- * A default implementation of IEventDataFormat that converts the tuple
- * into a delimited string.
+ * A default implementation of IEventDataFormat that converts the tuple into a 
delimited string.
  */
 public class DefaultEventDataFormat implements IEventDataFormat {
-  private static final long serialVersionUID = 1L;
-  private String delimiter = ",";
-  
-  public DefaultEventDataFormat withFieldDelimiter(String delimiter) {
-    this.delimiter = delimiter;
-    return this;
-  }
+    private static final long serialVersionUID = 1L;
+    private String delimiter = ",";
+
+    public DefaultEventDataFormat withFieldDelimiter(String delimiter) {
+        this.delimiter = delimiter;
+        return this;
+    }
 
-  @Override
-  public byte[] serialize(Tuple tuple) {
-    StringBuilder sb = new StringBuilder();
-    for(Object obj : tuple.getValues()) {
-      if(sb.length() != 0) {
-        sb.append(delimiter);
-      }
-      sb.append(obj.toString());
+    @Override
+    public byte[] serialize(Tuple tuple) {
+        StringBuilder sb = new StringBuilder();
+        for (Object obj : tuple.getValues()) {
+            if (sb.length() != 0) {
+                sb.append(delimiter);
+            }
+            sb.append(obj.toString());
+        }
+        return sb.toString().getBytes();
     }
-    return sb.toString().getBytes();
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
index 7d1aeab..85ffd03 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.bolt;
 
 
@@ -22,6 +23,8 @@ import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventHubClient;
 import com.microsoft.azure.eventhubs.PartitionSender;
 import com.microsoft.azure.servicebus.ServiceBusException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import org.apache.storm.eventhubs.spout.EventHubException;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -31,116 +34,107 @@ import org.apache.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
 /**
  * A bolt that writes event message to EventHub.
  */
 public class EventHubBolt extends BaseRichBolt {
-       private static final long serialVersionUID = 1L;
-       private static final Logger logger = LoggerFactory
-                       .getLogger(EventHubBolt.class);
-
-       protected OutputCollector collector;
-       protected PartitionSender sender;
-       protected EventHubClient ehClient;
-       protected EventHubBoltConfig boltConfig;
-
-       public EventHubBolt(String connectionString, String entityPath) {
-               boltConfig = new EventHubBoltConfig(connectionString, 
entityPath);
-       }
-
-       public EventHubBolt(String userName, String password, String namespace,
-                       String entityPath, boolean partitionMode) {
-               boltConfig = new EventHubBoltConfig(userName, password, 
namespace,
-                               entityPath, partitionMode);
-       }
-
-       public EventHubBolt(EventHubBoltConfig config) {
-               boltConfig = config;
-       }
-
-       @Override
-       public void prepare(Map<String, Object> config, TopologyContext context,
-                       OutputCollector collector) {
-               this.collector = collector;
-               String myPartitionId = null;
-               if (boltConfig.getPartitionMode()) {
-                       // We can use the task index (starting from 0) as the 
partition ID
-                       myPartitionId = "" + context.getThisTaskIndex();
-               }
-               logger.info("creating sender: " + 
boltConfig.getConnectionString()
-                               + ", " + boltConfig.getEntityPath() + ", " + 
myPartitionId);
-               try {
-                       ehClient = 
EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
-                       if (boltConfig.getPartitionMode()) {
-                               sender = 
ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
-                       }
-               } catch (Exception ex) {
-                       collector.reportError(ex);
-                       throw new RuntimeException(ex);
-               }
-
-       }
-
-       @Override
-       public void execute(Tuple tuple) {
-               try {
-                       EventData sendEvent = new 
EventData(boltConfig.getEventDataFormat().serialize(tuple));
-                       if (boltConfig.getPartitionMode() && sender!=null) {
-                               sender.sendSync(sendEvent);
-                       }
-                       else if (boltConfig.getPartitionMode() && sender==null) 
{
-                               throw new EventHubException("Sender is null");
-                       }
-                       else if (!boltConfig.getPartitionMode() && 
ehClient!=null) {
-                               ehClient.sendSync(sendEvent);
-                       }
-                       else if (!boltConfig.getPartitionMode() && 
ehClient==null) {
-                               throw new EventHubException("ehclient is null");
-                       }
-                       collector.ack(tuple);
-               } catch (EventHubException ex ) {
-                       collector.reportError(ex);
-                       collector.fail(tuple);
-               } catch (ServiceBusException e) {
-                       collector.reportError(e);
-                       collector.fail(tuple);
-               }
-       }
-
-       @Override
-       public void cleanup() {
-               if(sender != null) {
-                       try {
-                               sender.close().whenComplete((voidargs,error)->{
-                                       try{
-                                               if(error!=null){
-                                                       logger.error("Exception 
during sender cleanup phase"+error.toString());
-                                               }
-                                               ehClient.closeSync();
-                                       }catch (Exception e){
-                                               logger.error("Exception during 
ehclient cleanup phase"+e.toString());
-                                       }
-                               }).get();
-                       } catch (InterruptedException e) {
-                               logger.error("Exception occured during cleanup 
phase"+e.toString());
-                       } catch (ExecutionException e) {
-                               logger.error("Exception occured during cleanup 
phase"+e.toString());
-                       }
-                       logger.info("Eventhub Bolt cleaned up");
-                       sender = null;
-                       ehClient =  null;
-               }
-       }
-
-       @Override
-       public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-       }
+    private static final long serialVersionUID = 1L;
+    private static final Logger logger = LoggerFactory
+        .getLogger(EventHubBolt.class);
+
+    protected OutputCollector collector;
+    protected PartitionSender sender;
+    protected EventHubClient ehClient;
+    protected EventHubBoltConfig boltConfig;
+
+    public EventHubBolt(String connectionString, String entityPath) {
+        boltConfig = new EventHubBoltConfig(connectionString, entityPath);
+    }
+
+    public EventHubBolt(String userName, String password, String namespace,
+                        String entityPath, boolean partitionMode) {
+        boltConfig = new EventHubBoltConfig(userName, password, namespace,
+                                            entityPath, partitionMode);
+    }
+
+    public EventHubBolt(EventHubBoltConfig config) {
+        boltConfig = config;
+    }
+
+    @Override
+    public void prepare(Map<String, Object> config, TopologyContext context,
+                        OutputCollector collector) {
+        this.collector = collector;
+        String myPartitionId = null;
+        if (boltConfig.getPartitionMode()) {
+            // We can use the task index (starting from 0) as the partition ID
+            myPartitionId = "" + context.getThisTaskIndex();
+        }
+        logger.info("creating sender: " + boltConfig.getConnectionString()
+                    + ", " + boltConfig.getEntityPath() + ", " + 
myPartitionId);
+        try {
+            ehClient = 
EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
+            if (boltConfig.getPartitionMode()) {
+                sender = 
ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
+            }
+        } catch (Exception ex) {
+            collector.reportError(ex);
+            throw new RuntimeException(ex);
+        }
+
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            EventData sendEvent = new 
EventData(boltConfig.getEventDataFormat().serialize(tuple));
+            if (boltConfig.getPartitionMode() && sender != null) {
+                sender.sendSync(sendEvent);
+            } else if (boltConfig.getPartitionMode() && sender == null) {
+                throw new EventHubException("Sender is null");
+            } else if (!boltConfig.getPartitionMode() && ehClient != null) {
+                ehClient.sendSync(sendEvent);
+            } else if (!boltConfig.getPartitionMode() && ehClient == null) {
+                throw new EventHubException("ehclient is null");
+            }
+            collector.ack(tuple);
+        } catch (EventHubException ex) {
+            collector.reportError(ex);
+            collector.fail(tuple);
+        } catch (ServiceBusException e) {
+            collector.reportError(e);
+            collector.fail(tuple);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        if (sender != null) {
+            try {
+                sender.close().whenComplete((voidargs, error) -> {
+                    try {
+                        if (error != null) {
+                            logger.error("Exception during sender cleanup 
phase" + error.toString());
+                        }
+                        ehClient.closeSync();
+                    } catch (Exception e) {
+                        logger.error("Exception during ehclient cleanup phase" 
+ e.toString());
+                    }
+                }).get();
+            } catch (InterruptedException e) {
+                logger.error("Exception occured during cleanup phase" + 
e.toString());
+            } catch (ExecutionException e) {
+                logger.error("Exception occured during cleanup phase" + 
e.toString());
+            }
+            logger.info("Eventhub Bolt cleaned up");
+            sender = null;
+            ehClient = null;
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
index f5e1458..41e39e4 100644
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
@@ -15,97 +15,94 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.bolt;
 
 import com.microsoft.azure.servicebus.ConnectionStringBuilder;
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-
-import java.io.Serializable;
-
 import java.io.Serializable;
+import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
 
 /*
  * EventHubs bolt configurations
  *
  * Partition mode:
- * With partitionMode=true you need to create the same number of tasks as the 
number of 
+ * With partitionMode=true you need to create the same number of tasks as the 
number of
  * EventHubs partitions, and each bolt task will only send data to one 
partition.
  * The partition ID is the task ID of the bolt.
- * 
+ *
  * Event format:
  * The formatter to convert tuple to bytes for EventHubs.
  * if null, the default format is common delimited tuple fields.
  */
 public class EventHubBoltConfig implements Serializable {
-  private static final long serialVersionUID = 1L;
-  
-  private String connectionString;
-  private final String entityPath;
-  protected boolean partitionMode;
-  protected IEventDataFormat dataFormat;
-  
-  public EventHubBoltConfig(String connectionString, String entityPath) {
-    this(connectionString, entityPath, false, null);
-  }
-  
-  public EventHubBoltConfig(String connectionString, String entityPath,
-      boolean partitionMode) {
-    this(connectionString, entityPath, partitionMode, null);
-  }
-  
-  public EventHubBoltConfig(String userName, String password, String namespace,
-      String entityPath, boolean partitionMode) {
-    this(userName, password, namespace,
-        EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode);
-  }
-  
-  public EventHubBoltConfig(String connectionString, String entityPath,
-      boolean partitionMode, IEventDataFormat dataFormat) {
-    this.connectionString = connectionString;
-    this.entityPath = entityPath;
-    this.partitionMode = partitionMode;
-    this.dataFormat = dataFormat;
-    if(this.dataFormat == null) {
-      this.dataFormat = new DefaultEventDataFormat();
+    private static final long serialVersionUID = 1L;
+    private final String entityPath;
+    protected boolean partitionMode;
+    protected IEventDataFormat dataFormat;
+    private String connectionString;
+
+    public EventHubBoltConfig(String connectionString, String entityPath) {
+        this(connectionString, entityPath, false, null);
+    }
+
+    public EventHubBoltConfig(String connectionString, String entityPath,
+                              boolean partitionMode) {
+        this(connectionString, entityPath, partitionMode, null);
+    }
+
+    public EventHubBoltConfig(String userName, String password, String 
namespace,
+                              String entityPath, boolean partitionMode) {
+        this(userName, password, namespace,
+             EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, 
partitionMode);
+    }
+
+    public EventHubBoltConfig(String connectionString, String entityPath,
+                              boolean partitionMode, IEventDataFormat 
dataFormat) {
+        this.connectionString = connectionString;
+        this.entityPath = entityPath;
+        this.partitionMode = partitionMode;
+        this.dataFormat = dataFormat;
+        if (this.dataFormat == null) {
+            this.dataFormat = new DefaultEventDataFormat();
+        }
     }
-  }
-  
-  public EventHubBoltConfig(String userName, String password, String namespace,
-      String targetFqnAddress, String entityPath) {
-    this(userName, password, namespace, targetFqnAddress, entityPath, false, 
null);
-  }
-  
-  public EventHubBoltConfig(String userName, String password, String namespace,
-      String targetFqnAddress, String entityPath, boolean partitionMode) {
-    this(userName, password, namespace, targetFqnAddress, entityPath, 
partitionMode, null);
-  }
-  
-  public EventHubBoltConfig(String userName, String password, String namespace,
-      String targetFqnAddress, String entityPath, boolean partitionMode,
-      IEventDataFormat dataFormat) {
-    this.connectionString = new ConnectionStringBuilder(namespace,entityPath,
-            userName,password).toString();
-    this.entityPath = entityPath;
-    this.partitionMode = partitionMode;
-    this.dataFormat = dataFormat;
-    if(this.dataFormat == null) {
-      this.dataFormat = new DefaultEventDataFormat();
+
+    public EventHubBoltConfig(String userName, String password, String 
namespace,
+                              String targetFqnAddress, String entityPath) {
+        this(userName, password, namespace, targetFqnAddress, entityPath, 
false, null);
+    }
+
+    public EventHubBoltConfig(String userName, String password, String 
namespace,
+                              String targetFqnAddress, String entityPath, 
boolean partitionMode) {
+        this(userName, password, namespace, targetFqnAddress, entityPath, 
partitionMode, null);
+    }
+
+    public EventHubBoltConfig(String userName, String password, String 
namespace,
+                              String targetFqnAddress, String entityPath, 
boolean partitionMode,
+                              IEventDataFormat dataFormat) {
+        this.connectionString = new ConnectionStringBuilder(namespace, 
entityPath,
+                                                            userName, 
password).toString();
+        this.entityPath = entityPath;
+        this.partitionMode = partitionMode;
+        this.dataFormat = dataFormat;
+        if (this.dataFormat == null) {
+            this.dataFormat = new DefaultEventDataFormat();
+        }
+    }
+
+    public String getConnectionString() {
+        return connectionString;
+    }
+
+    public String getEntityPath() {
+        return entityPath;
+    }
+
+    public boolean getPartitionMode() {
+        return partitionMode;
+    }
+
+    public IEventDataFormat getEventDataFormat() {
+        return dataFormat;
     }
-  }
-  
-  public String getConnectionString() {
-    return connectionString;
-  }
-  
-  public String getEntityPath() {
-    return entityPath;
-  }
-  
-  public boolean getPartitionMode() {
-    return partitionMode;
-  }
-  
-  public IEventDataFormat getEventDataFormat() {
-    return dataFormat;
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
index d2aacb7..ec09460 100644
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.bolt;
 
 import java.io.Serializable;
@@ -24,5 +25,5 @@ import org.apache.storm.tuple.Tuple;
  * Serialize a tuple to a byte array to be sent to EventHubs
  */
 public interface IEventDataFormat extends Serializable {
-  public byte[] serialize(Tuple tuple);
+    public byte[] serialize(Tuple tuple);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
index bbd46ea..4c8e0a2 100644
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
@@ -15,60 +15,58 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import com.microsoft.azure.eventhubs.EventData;
-import org.apache.storm.tuple.Fields;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * An Event Data Scheme which deserializes message payload into the raw bytes.
  *
- * The resulting tuple would contain three items, the first being the message
- * bytes, and the second a map of properties that include metadata, which can 
be
- * used to determine who processes the message, and how it is processed.The 
third is
- * the system properties which exposes information like enqueue-time, offset 
and
- * sequence number
+ * The resulting tuple would contain three items, the first being the message 
bytes, and the second a map of properties that include
+ * metadata, which can be used to determine who processes the message, and how 
it is processed.The third is the system properties which
+ * exposes information like enqueue-time, offset and sequence number
  */
 public class BinaryEventDataScheme implements IEventDataScheme {
 
-       private static final Logger logger = 
LoggerFactory.getLogger(BinaryEventDataScheme.class);
-       @Override
-       public List<Object> deserialize(EventData eventData){
-               final List<Object> fieldContents = new ArrayList<Object>();
-               byte [] messageData = null;
-               if (eventData.getBytes() != null) {
-                       messageData = eventData.getBytes();
-               }
-               else if (eventData.getObject()!=null) {
-                       try {
-                               messageData = 
SerializeDeserializeUtil.serialize(eventData.getObject());
-                       } catch (IOException e) {
-                               logger.error("Failed to serialize EventData 
payload class"
-                                               + 
eventData.getObject().getClass());
-                               logger.error("Exception encountered while 
serializing EventData payload is"
-                                               + e.toString());
-                               throw new RuntimeException(e);
-                       }
-               }
-               Map<String, Object> metaDataMap =  eventData.getProperties();
-               Map<String, Object> systemMetaDataMap = 
eventData.getSystemProperties();
-               fieldContents.add(messageData);
-               fieldContents.add(metaDataMap);
-               fieldContents.add(systemMetaDataMap);
-               return fieldContents;
-       }
+    private static final Logger logger = 
LoggerFactory.getLogger(BinaryEventDataScheme.class);
+
+    @Override
+    public List<Object> deserialize(EventData eventData) {
+        final List<Object> fieldContents = new ArrayList<Object>();
+        byte[] messageData = null;
+        if (eventData.getBytes() != null) {
+            messageData = eventData.getBytes();
+        } else if (eventData.getObject() != null) {
+            try {
+                messageData = 
SerializeDeserializeUtil.serialize(eventData.getObject());
+            } catch (IOException e) {
+                logger.error("Failed to serialize EventData payload class"
+                             + eventData.getObject().getClass());
+                logger.error("Exception encountered while serializing 
EventData payload is"
+                             + e.toString());
+                throw new RuntimeException(e);
+            }
+        }
+        Map<String, Object> metaDataMap = eventData.getProperties();
+        Map<String, Object> systemMetaDataMap = 
eventData.getSystemProperties();
+        fieldContents.add(messageData);
+        fieldContents.add(metaDataMap);
+        fieldContents.add(systemMetaDataMap);
+        return fieldContents;
+    }
 
-       @Override
-       public Fields getOutputFields() {
-               return new Fields(FieldConstants.Message, 
FieldConstants.META_DATA,
-                               FieldConstants.SYSTEM_META_DATA);
-       }
+    @Override
+    public Fields getOutputFields() {
+        return new Fields(FieldConstants.Message, FieldConstants.META_DATA,
+                          FieldConstants.SYSTEM_META_DATA);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
index 9fbcecf..9bd0c22 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
@@ -15,64 +15,62 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import com.microsoft.azure.eventhubs.EventData;
-import org.apache.storm.tuple.Fields;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * An Event Data Scheme which deserializes message payload into the Strings. No
- * encoding is assumed. The receiver will need to handle parsing of the string
- * data in appropriate encoding.
+ * An Event Data Scheme which deserializes message payload into the Strings. 
No encoding is assumed. The receiver will need to handle
+ * parsing of the string data in appropriate encoding.
  *
- * The resulting tuple would contain two items: the the message string, and a
- * map of properties that include metadata, which can be used to determine who
- * processes the message, and how it is processed.
- * 
- * For passing the raw bytes of a messsage to Bolts, refer to
- * {@link BinaryEventDataScheme}.
+ * The resulting tuple would contain two items: the the message string, and a 
map of properties that include metadata, which can be used to
+ * determine who processes the message, and how it is processed.
+ *
+ * For passing the raw bytes of a messsage to Bolts, refer to {@link 
BinaryEventDataScheme}.
  */
 public class EventDataScheme implements IEventDataScheme {
 
-       private static final long serialVersionUID = 1L;
-       private static final Logger logger = 
LoggerFactory.getLogger(EventDataScheme.class);
-       @Override
-       public List<Object> deserialize(EventData eventData) {
-               final List<Object> fieldContents = new ArrayList<Object>();
-               String messageData = "";
-               if (eventData.getBytes()!=null) {
-                       messageData = new String(eventData.getBytes());
-               }
-               /*Will only serialize AMQPValue type*/
-               else if (eventData.getObject()!=null) {
-                       try {
-                               if (!(eventData.getObject() instanceof List)) {
-                                       messageData = 
eventData.getObject().toString();
-                               } else {
-                                       throw new RuntimeException("Cannot 
serialize the given AMQP type");
-                               }
-                       } catch (RuntimeException e) {
-                               logger.error("Failed to serialize EventData 
payload class"
-                                               + 
eventData.getObject().getClass());
-                               logger.error("Exception encountered while 
serializing EventData payload is"
-                                               + e.toString());
-                               throw e;
-                       }
-               }
-               Map<String, Object> metaDataMap = eventData.getProperties();
-                fieldContents.add(messageData);
-               fieldContents.add(metaDataMap);
-               return fieldContents;
-       }
+    private static final long serialVersionUID = 1L;
+    private static final Logger logger = 
LoggerFactory.getLogger(EventDataScheme.class);
+
+    @Override
+    public List<Object> deserialize(EventData eventData) {
+        final List<Object> fieldContents = new ArrayList<Object>();
+        String messageData = "";
+        if (eventData.getBytes() != null) {
+            messageData = new String(eventData.getBytes());
+        }
+        /*Will only serialize AMQPValue type*/
+        else if (eventData.getObject() != null) {
+            try {
+                if (!(eventData.getObject() instanceof List)) {
+                    messageData = eventData.getObject().toString();
+                } else {
+                    throw new RuntimeException("Cannot serialize the given 
AMQP type");
+                }
+            } catch (RuntimeException e) {
+                logger.error("Failed to serialize EventData payload class"
+                             + eventData.getObject().getClass());
+                logger.error("Exception encountered while serializing 
EventData payload is"
+                             + e.toString());
+                throw e;
+            }
+        }
+        Map<String, Object> metaDataMap = eventData.getProperties();
+        fieldContents.add(messageData);
+        fieldContents.add(metaDataMap);
+        return fieldContents;
+    }
 
-       @Override
-       public Fields getOutputFields() {
-               return new Fields(FieldConstants.Message, 
FieldConstants.META_DATA);
-       }
+    @Override
+    public Fields getOutputFields() {
+        return new Fields(FieldConstants.Message, FieldConstants.META_DATA);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java
index 5eeb4d2..fc23c05 100644
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java
@@ -15,34 +15,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import com.microsoft.azure.eventhubs.EventData;
 
 public class EventDataWrap implements Comparable<EventDataWrap> {
-  private final EventData eventData;
-  private final MessageId messageId;
-
-  public EventDataWrap(EventData eventdata, MessageId messageId) {
-    this.eventData = eventdata;
-    this.messageId = messageId;
-  }
-
-  public static EventDataWrap create(EventData eventData, MessageId messageId) 
{
-    return new EventDataWrap(eventData, messageId);
-  }
-
-  public EventData getEventData() {
-    return this.eventData;
-  }
-
-  public MessageId getMessageId() {
-    return this.messageId;
-  }
-
-  @Override
-  public int compareTo(EventDataWrap ed) {
-    return messageId.getSequenceNumber().
-        compareTo(ed.getMessageId().getSequenceNumber());
-  }
+    private final EventData eventData;
+    private final MessageId messageId;
+
+    public EventDataWrap(EventData eventdata, MessageId messageId) {
+        this.eventData = eventdata;
+        this.messageId = messageId;
+    }
+
+    public static EventDataWrap create(EventData eventData, MessageId 
messageId) {
+        return new EventDataWrap(eventData, messageId);
+    }
+
+    public EventData getEventData() {
+        return this.eventData;
+    }
+
+    public MessageId getMessageId() {
+        return this.messageId;
+    }
+
+    @Override
+    public int compareTo(EventDataWrap ed) {
+        return messageId.getSequenceNumber().
+            compareTo(ed.getMessageId().getSequenceNumber());
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java
index a375380..509191d 100644
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java
@@ -15,32 +15,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import java.time.Instant;
 
-public class EventHubFilter implements IEventFilter{
+public class EventHubFilter implements IEventFilter {
 
     String offset = null;
     Instant time = null;
 
-    public EventHubFilter(String offset){
+    public EventHubFilter(String offset) {
         this.offset = offset;
         this.time = null;
     }
 
-    public EventHubFilter(Instant time){
+    public EventHubFilter(Instant time) {
         this.time = time;
         this.offset = null;
     }
 
     @Override
-    public String getOffset(){
+    public String getOffset() {
         return offset;
     }
 
     @Override
-    public Instant getTime(){
+    public Instant getTime() {
         return time;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
index 459b9bc..83dc850 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
@@ -15,152 +15,149 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventHubClient;
 import com.microsoft.azure.eventhubs.PartitionReceiver;
 import com.microsoft.azure.servicebus.ServiceBusException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import org.apache.storm.metric.api.CountMetric;
 import org.apache.storm.metric.api.MeanReducer;
 import org.apache.storm.metric.api.ReducedMetric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
 public class EventHubReceiverImpl implements IEventHubReceiver {
-  private static final Logger logger = 
LoggerFactory.getLogger(EventHubReceiverImpl.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(EventHubReceiverImpl.class);
 
-  private final String connectionString;
-  private final String entityName;
-  private final String partitionId;
-  private final String consumerGroupName;
+    private final String connectionString;
+    private final String entityName;
+    private final String partitionId;
+    private final String consumerGroupName;
 
-  private PartitionReceiver receiver;
-  private EventHubClient ehClient;
-  private ReducedMetric receiveApiLatencyMean;
-  private CountMetric receiveApiCallCount;
-  private CountMetric receiveMessageCount;
+    private PartitionReceiver receiver;
+    private EventHubClient ehClient;
+    private ReducedMetric receiveApiLatencyMean;
+    private CountMetric receiveApiCallCount;
+    private CountMetric receiveMessageCount;
 
-  public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) {
-    this.connectionString = config.getConnectionString();
-    this.entityName = config.getEntityPath();
-    this.partitionId = partitionId;
-    this.consumerGroupName = config.getConsumerGroupName();
-    receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
-    receiveApiCallCount = new CountMetric();
-    receiveMessageCount = new CountMetric();
-  }
+    public EventHubReceiverImpl(EventHubSpoutConfig config, String 
partitionId) {
+        this.connectionString = config.getConnectionString();
+        this.entityName = config.getEntityPath();
+        this.partitionId = partitionId;
+        this.consumerGroupName = config.getConsumerGroupName();
+        receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
+        receiveApiCallCount = new CountMetric();
+        receiveMessageCount = new CountMetric();
+    }
 
-  @Override
-  public void open(IEventFilter filter) throws EventHubException {
-    logger.info("creating eventhub receiver: partitionId=" + partitionId +
-            ", filter=" + filter.getOffset() != null ?
-            filter.getOffset() : 
Long.toString(filter.getTime().toEpochMilli()));
-    long start = System.currentTimeMillis();
-    try {
-      ehClient = 
EventHubClient.createFromConnectionStringSync(connectionString);
+    @Override
+    public void open(IEventFilter filter) throws EventHubException {
+        logger.info("creating eventhub receiver: partitionId=" + partitionId +
+                    ", filter=" + filter.getOffset() != null ?
+                        filter.getOffset() : 
Long.toString(filter.getTime().toEpochMilli()));
+        long start = System.currentTimeMillis();
+        try {
+            ehClient = 
EventHubClient.createFromConnectionStringSync(connectionString);
 
-      if (filter.getOffset()!=null) {
-        receiver = ehClient.createEpochReceiverSync(
-                   consumerGroupName,
-                   partitionId,
-                   filter.getOffset(),
-                   false,
-                   1);
-      }
-      else if (filter.getTime()!=null) {
-        receiver = ehClient.createEpochReceiverSync(
-                   consumerGroupName,
-                   partitionId,
-                   filter.getTime(),
-                   1);
-      }
-      else{
-        throw new RuntimeException("Eventhub receiver must have " +
-                "an offset or time to be created");
-      }
-    } catch (IOException e) {
-      logger.error("Exception in creating ehclient"+ e.toString());
-      throw new EventHubException(e);
-    }
-    catch (ServiceBusException e) {
-      logger.error("Exception in creating Receiver"+e.toString());
-      throw new EventHubException(e);
+            if (filter.getOffset() != null) {
+                receiver = ehClient.createEpochReceiverSync(
+                    consumerGroupName,
+                    partitionId,
+                    filter.getOffset(),
+                    false,
+                    1);
+            } else if (filter.getTime() != null) {
+                receiver = ehClient.createEpochReceiverSync(
+                    consumerGroupName,
+                    partitionId,
+                    filter.getTime(),
+                    1);
+            } else {
+                throw new RuntimeException("Eventhub receiver must have " +
+                                           "an offset or time to be created");
+            }
+        } catch (IOException e) {
+            logger.error("Exception in creating ehclient" + e.toString());
+            throw new EventHubException(e);
+        } catch (ServiceBusException e) {
+            logger.error("Exception in creating Receiver" + e.toString());
+            throw new EventHubException(e);
+        }
+        long end = System.currentTimeMillis();
+        logger.info("created eventhub receiver, time taken(ms): " + (end - 
start));
     }
-    long end = System.currentTimeMillis();
-    logger.info("created eventhub receiver, time taken(ms): " + (end-start));
-  }
 
-  @Override
-  public void close() {
-    if(receiver != null) {
-      try {
-        receiver.close().whenComplete((voidargs,error)->{
-          try {
-            if (error!=null) {
-              logger.error("Exception during receiver close 
phase"+error.toString());
+    @Override
+    public void close() {
+        if (receiver != null) {
+            try {
+                receiver.close().whenComplete((voidargs, error) -> {
+                    try {
+                        if (error != null) {
+                            logger.error("Exception during receiver close 
phase" + error.toString());
+                        }
+                        ehClient.closeSync();
+                    } catch (Exception e) {
+                        logger.error("Exception during ehclient close phase" + 
e.toString());
+                    }
+                }).get();
+            } catch (InterruptedException e) {
+                logger.error("Exception occured during close phase" + 
e.toString());
+            } catch (ExecutionException e) {
+                logger.error("Exception occured during close phase" + 
e.toString());
             }
-            ehClient.closeSync();
-          } catch (Exception e) {
-            logger.error("Exception during ehclient close phase"+e.toString());
-          }
-        }).get();
-      } catch (InterruptedException e) {
-        logger.error("Exception occured during close phase"+e.toString());
-      } catch (ExecutionException e) {
-        logger.error("Exception occured during close phase"+e.toString());
-      }
-      logger.info("closed eventhub receiver: partitionId=" + partitionId );
-      receiver = null;
-      ehClient =  null;
+            logger.info("closed eventhub receiver: partitionId=" + 
partitionId);
+            receiver = null;
+            ehClient = null;
+        }
     }
-  }
-
 
-  @Override
-  public boolean isOpen() {
-    return (receiver != null);
-  }
 
-  @Override
-  public EventDataWrap receive() {
-    long start = System.currentTimeMillis();
-    Iterable<EventData> receivedEvents=null;
-    /*Get one message at a time for backward compatibility behaviour*/
-    try {
-      receivedEvents = receiver.receiveSync(1);
-    } catch (ServiceBusException e) {
-      logger.error("Exception occured during receive"+e.toString());
-      return null;
+    @Override
+    public boolean isOpen() {
+        return (receiver != null);
     }
-    long end = System.currentTimeMillis();
-    long millis = (end - start);
-    receiveApiLatencyMean.update(millis);
-    receiveApiCallCount.incr();
 
-    if (receivedEvents == null || 
receivedEvents.spliterator().getExactSizeIfKnown() == 0) {
-      return null;
-    }
-    receiveMessageCount.incr();
-    EventData receivedEvent = receivedEvents.iterator().next();
-    MessageId messageId = new MessageId(partitionId,
-            receivedEvent.getSystemProperties().getOffset(),
-            receivedEvent.getSystemProperties().getSequenceNumber());
+    @Override
+    public EventDataWrap receive() {
+        long start = System.currentTimeMillis();
+        Iterable<EventData> receivedEvents = null;
+        /*Get one message at a time for backward compatibility behaviour*/
+        try {
+            receivedEvents = receiver.receiveSync(1);
+        } catch (ServiceBusException e) {
+            logger.error("Exception occured during receive" + e.toString());
+            return null;
+        }
+        long end = System.currentTimeMillis();
+        long millis = (end - start);
+        receiveApiLatencyMean.update(millis);
+        receiveApiCallCount.incr();
+
+        if (receivedEvents == null || 
receivedEvents.spliterator().getExactSizeIfKnown() == 0) {
+            return null;
+        }
+        receiveMessageCount.incr();
+        EventData receivedEvent = receivedEvents.iterator().next();
+        MessageId messageId = new MessageId(partitionId,
+                                            
receivedEvent.getSystemProperties().getOffset(),
+                                            
receivedEvent.getSystemProperties().getSequenceNumber());
 
-    return EventDataWrap.create(receivedEvent,messageId);
-  }
+        return EventDataWrap.create(receivedEvent, messageId);
+    }
 
-  @Override
-  public Map<String, Object> getMetricsData() {
-    Map<String, Object> ret = new HashMap<>();
-    ret.put(partitionId + "/receiveApiLatencyMean", 
receiveApiLatencyMean.getValueAndReset());
-    ret.put(partitionId + "/receiveApiCallCount", 
receiveApiCallCount.getValueAndReset());
-    ret.put(partitionId + "/receiveMessageCount", 
receiveMessageCount.getValueAndReset());
-    return ret;
-  }
+    @Override
+    public Map<String, Object> getMetricsData() {
+        Map<String, Object> ret = new HashMap<>();
+        ret.put(partitionId + "/receiveApiLatencyMean", 
receiveApiLatencyMean.getValueAndReset());
+        ret.put(partitionId + "/receiveApiCallCount", 
receiveApiCallCount.getValueAndReset());
+        ret.put(partitionId + "/receiveMessageCount", 
receiveMessageCount.getValueAndReset());
+        return ret;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
index d8c3d09..f8e144e 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
@@ -15,9 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import com.google.common.base.Strings;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import org.apache.storm.Config;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -27,236 +32,233 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
 public class EventHubSpout extends BaseRichSpout {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(EventHubSpout.class);
-
-  private final UUID instanceId;
-  private final EventHubSpoutConfig eventHubConfig;
-  private final IEventDataScheme scheme;
-  private final int checkpointIntervalInSeconds;
-
-  private IStateStore stateStore;
-  private IPartitionCoordinator partitionCoordinator;
-  private IPartitionManagerFactory pmFactory;
-  private IEventHubReceiverFactory recvFactory;
-  private SpoutOutputCollector collector;
-  private long lastCheckpointTime;
-  private int currentPartitionIndex = -1;
-
-  public EventHubSpout(String username, String password, String namespace,
-      String entityPath, int partitionCount) {
-    this(new EventHubSpoutConfig(username, password, namespace, entityPath, 
partitionCount));
-  }
-
-  public EventHubSpout(EventHubSpoutConfig spoutConfig) {
-    this(spoutConfig, null, null, null);
-  }
-  
-  public EventHubSpout(EventHubSpoutConfig spoutConfig,
-      IStateStore store,
-      IPartitionManagerFactory pmFactory,
-      IEventHubReceiverFactory recvFactory) {
-    this.eventHubConfig = spoutConfig;
-    this.scheme = spoutConfig.getEventDataScheme();
-    this.instanceId = UUID.randomUUID();
-    this.checkpointIntervalInSeconds = 
spoutConfig.getCheckpointIntervalInSeconds();
-    this.lastCheckpointTime = System.currentTimeMillis();
-    stateStore = store;
-    this.pmFactory = pmFactory;
-    if(this.pmFactory == null) {
-      this.pmFactory = new IPartitionManagerFactory() {
-        @Override
-        public IPartitionManager create(EventHubSpoutConfig spoutConfig,
-            String partitionId, IStateStore stateStore,
-            IEventHubReceiver receiver) {
-          return new PartitionManager(spoutConfig, partitionId,
-              stateStore, receiver);
+    private static final Logger logger = 
LoggerFactory.getLogger(EventHubSpout.class);
+
+    private final UUID instanceId;
+    private final EventHubSpoutConfig eventHubConfig;
+    private final IEventDataScheme scheme;
+    private final int checkpointIntervalInSeconds;
+
+    private IStateStore stateStore;
+    private IPartitionCoordinator partitionCoordinator;
+    private IPartitionManagerFactory pmFactory;
+    private IEventHubReceiverFactory recvFactory;
+    private SpoutOutputCollector collector;
+    private long lastCheckpointTime;
+    private int currentPartitionIndex = -1;
+
+    public EventHubSpout(String username, String password, String namespace,
+                         String entityPath, int partitionCount) {
+        this(new EventHubSpoutConfig(username, password, namespace, 
entityPath, partitionCount));
+    }
+
+    public EventHubSpout(EventHubSpoutConfig spoutConfig) {
+        this(spoutConfig, null, null, null);
+    }
+
+    public EventHubSpout(EventHubSpoutConfig spoutConfig,
+                         IStateStore store,
+                         IPartitionManagerFactory pmFactory,
+                         IEventHubReceiverFactory recvFactory) {
+        this.eventHubConfig = spoutConfig;
+        this.scheme = spoutConfig.getEventDataScheme();
+        this.instanceId = UUID.randomUUID();
+        this.checkpointIntervalInSeconds = 
spoutConfig.getCheckpointIntervalInSeconds();
+        this.lastCheckpointTime = System.currentTimeMillis();
+        stateStore = store;
+        this.pmFactory = pmFactory;
+        if (this.pmFactory == null) {
+            this.pmFactory = new IPartitionManagerFactory() {
+                @Override
+                public IPartitionManager create(EventHubSpoutConfig 
spoutConfig,
+                                                String partitionId, 
IStateStore stateStore,
+                                                IEventHubReceiver receiver) {
+                    return new PartitionManager(spoutConfig, partitionId,
+                                                stateStore, receiver);
+                }
+            };
+        }
+        this.recvFactory = recvFactory;
+        if (this.recvFactory == null) {
+            this.recvFactory = new IEventHubReceiverFactory() {
+                @Override
+                public IEventHubReceiver create(EventHubSpoutConfig 
spoutConfig,
+                                                String partitionId) {
+                    return new EventHubReceiverImpl(spoutConfig, partitionId);
+                }
+            };
         }
-      };
+
     }
-    this.recvFactory = recvFactory;
-    if(this.recvFactory == null) {
-      this.recvFactory = new IEventHubReceiverFactory() {
-        @Override
-        public IEventHubReceiver create(EventHubSpoutConfig spoutConfig,
-            String partitionId) {
-          return new EventHubReceiverImpl(spoutConfig, partitionId);
+
+    /**
+     * This is a extracted method that is easy to test
+     *
+     * @param config
+     * @param totalTasks
+     * @param taskIndex
+     * @param collector
+     * @throws Exception
+     */
+    public void preparePartitions(Map<String, Object> config, int totalTasks, 
int taskIndex, SpoutOutputCollector collector) throws
+        Exception {
+        this.collector = collector;
+        if (stateStore == null) {
+            String zkEndpointAddress = eventHubConfig.getZkConnectionString();
+            if (zkEndpointAddress == null || zkEndpointAddress.length() == 0) {
+                //use storm's zookeeper servers if not specified.
+                @SuppressWarnings("unchecked")
+                List<String> zkServers = (List<String>) 
config.get(Config.STORM_ZOOKEEPER_SERVERS);
+                Integer zkPort = ((Number) 
config.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
+                StringBuilder sb = new StringBuilder();
+                for (String zk : zkServers) {
+                    if (sb.length() > 0) {
+                        sb.append(',');
+                    }
+                    sb.append(zk + ":" + zkPort);
+                }
+                zkEndpointAddress = sb.toString();
+            }
+            stateStore = new ZookeeperStateStore(zkEndpointAddress,
+                                                 
Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()),
+                                                 
Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL).toString()));
+        }
+        stateStore.open();
+
+        partitionCoordinator = new StaticPartitionCoordinator(
+            eventHubConfig, taskIndex, totalTasks, stateStore, pmFactory, 
recvFactory);
+
+        for (IPartitionManager partitionManager :
+            partitionCoordinator.getMyPartitionManagers()) {
+            partitionManager.open();
         }
-      };
     }
-    
-  }
-  
-  /**
-   * This is a extracted method that is easy to test
-   * @param config
-   * @param totalTasks
-   * @param taskIndex
-   * @param collector
-   * @throws Exception
-   */
-  public void preparePartitions(Map<String, Object> config, int totalTasks, 
int taskIndex, SpoutOutputCollector collector) throws Exception {
-    this.collector = collector;
-    if(stateStore == null) {
-      String zkEndpointAddress = eventHubConfig.getZkConnectionString();
-      if (zkEndpointAddress == null || zkEndpointAddress.length() == 0) {
-        //use storm's zookeeper servers if not specified.
-        @SuppressWarnings("unchecked")
-        List<String> zkServers = (List<String>) 
config.get(Config.STORM_ZOOKEEPER_SERVERS);
-        Integer zkPort = ((Number) 
config.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
-        StringBuilder sb = new StringBuilder();
-        for (String zk : zkServers) {
-          if (sb.length() > 0) {
-            sb.append(',');
-          }
-          sb.append(zk+":"+zkPort);
+
+    @Override
+    public void open(Map<String, Object> config, TopologyContext context, 
SpoutOutputCollector collector) {
+        logger.info("begin:start open()");
+        String topologyName = (String) config.get(Config.TOPOLOGY_NAME);
+        eventHubConfig.setTopologyName(topologyName);
+
+        int totalTasks = 
context.getComponentTasks(context.getThisComponentId()).size();
+        int taskIndex = context.getThisTaskIndex();
+        if (totalTasks > eventHubConfig.getPartitionCount()) {
+            throw new RuntimeException("total tasks of EventHubSpout is 
greater than partition count.");
+        }
+
+        logger.info(String.format("topologyName: %s, totalTasks: %d, 
taskIndex: %d", topologyName, totalTasks, taskIndex));
+
+        try {
+            preparePartitions(config, totalTasks, taskIndex, collector);
+        } catch (Exception e) {
+            collector.reportError(e);
+            throw new RuntimeException(e);
         }
-        zkEndpointAddress = sb.toString();
-      }
-      stateStore = new ZookeeperStateStore(zkEndpointAddress,
-          
Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()),
-          
Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL).toString()));
+
+        //register metrics
+        context.registerMetric("EventHubReceiver", new IMetric() {
+            @Override
+            public Object getValueAndReset() {
+                Map<String, Object> concatMetricsDataMaps = new HashMap<>();
+                for (IPartitionManager partitionManager :
+                    partitionCoordinator.getMyPartitionManagers()) {
+                    
concatMetricsDataMaps.putAll(partitionManager.getMetricsData());
+                }
+                return concatMetricsDataMaps;
+            }
+        }, 
Integer.parseInt(config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS).toString()));
+        logger.info("end open()");
     }
-    stateStore.open();
 
-    partitionCoordinator = new StaticPartitionCoordinator(
-        eventHubConfig, taskIndex, totalTasks, stateStore, pmFactory, 
recvFactory);
+    @Override
+    public void nextTuple() {
+        EventDataWrap eventDatawrap = null;
+
+        List<IPartitionManager> partitionManagers = 
partitionCoordinator.getMyPartitionManagers();
+        for (int i = 0; i < partitionManagers.size(); i++) {
+            currentPartitionIndex = (currentPartitionIndex + 1) % 
partitionManagers.size();
+            IPartitionManager partitionManager = 
partitionManagers.get(currentPartitionIndex);
+
+            if (partitionManager == null) {
+                throw new RuntimeException("partitionManager doesn't exist.");
+            }
+
+            eventDatawrap = partitionManager.receive();
+
+            if (eventDatawrap != null) {
+                break;
+            }
+        }
+
+        if (eventDatawrap != null) {
+            MessageId messageId = eventDatawrap.getMessageId();
+            List<Object> tuples = 
scheme.deserialize(eventDatawrap.getEventData());
+            if (tuples != null) {
+                collector.emit(tuples, messageId);
+            }
+        }
+
+        checkpointIfNeeded();
 
-    for (IPartitionManager partitionManager : 
-      partitionCoordinator.getMyPartitionManagers()) {
-      partitionManager.open();
+        // We don't need to sleep here because the IPartitionManager.receive() 
is
+        // a blocked call so it's fine to call this function in a tight loop.
     }
-  }
-
-  @Override
-  public void open(Map<String, Object> config, TopologyContext context, 
SpoutOutputCollector collector) {
-    logger.info("begin:start open()");
-    String topologyName = (String) config.get(Config.TOPOLOGY_NAME);
-    eventHubConfig.setTopologyName(topologyName);
-
-    int totalTasks = 
context.getComponentTasks(context.getThisComponentId()).size();
-    int taskIndex = context.getThisTaskIndex();
-    if (totalTasks > eventHubConfig.getPartitionCount()) {
-      throw new RuntimeException("total tasks of EventHubSpout is greater than 
partition count.");
+
+    @Override
+    public void ack(Object msgId) {
+        MessageId messageId = (MessageId) msgId;
+        IPartitionManager partitionManager = 
partitionCoordinator.getPartitionManager(messageId.getPartitionId());
+        String offset = messageId.getOffset();
+        partitionManager.ack(offset);
     }
 
-    logger.info(String.format("topologyName: %s, totalTasks: %d, taskIndex: 
%d", topologyName, totalTasks, taskIndex));
+    @Override
+    public void fail(Object msgId) {
+        MessageId messageId = (MessageId) msgId;
+        IPartitionManager partitionManager = 
partitionCoordinator.getPartitionManager(messageId.getPartitionId());
+        String offset = messageId.getOffset();
+        partitionManager.fail(offset);
+    }
 
-    try {
-      preparePartitions(config, totalTasks, taskIndex, collector);
-    } catch (Exception e) {
-         collector.reportError(e);
-      throw new RuntimeException(e);
+    @Override
+    public void deactivate() {
+        // let's checkpoint so that we can get the last checkpoint when 
restarting.
+        checkpoint();
     }
-    
-    //register metrics
-    context.registerMetric("EventHubReceiver", new IMetric() {
-      @Override
-      public Object getValueAndReset() {
-          Map<String, Object> concatMetricsDataMaps = new HashMap<>();
-          for (IPartitionManager partitionManager : 
+
+    @Override
+    public void close() {
+        for (IPartitionManager partitionManager :
             partitionCoordinator.getMyPartitionManagers()) {
-            concatMetricsDataMaps.putAll(partitionManager.getMetricsData());
-          }
-          return concatMetricsDataMaps;
-      }
-    }, 
Integer.parseInt(config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS).toString()));
-    logger.info("end open()");
-  }
-
-  @Override
-  public void nextTuple() {
-    EventDataWrap eventDatawrap = null;
-
-    List<IPartitionManager> partitionManagers = 
partitionCoordinator.getMyPartitionManagers();
-    for (int i = 0; i < partitionManagers.size(); i++) {
-      currentPartitionIndex = (currentPartitionIndex + 1) % 
partitionManagers.size();
-      IPartitionManager partitionManager = 
partitionManagers.get(currentPartitionIndex);
-
-      if (partitionManager == null) {
-        throw new RuntimeException("partitionManager doesn't exist.");
-      }
-
-      eventDatawrap = partitionManager.receive();
-
-      if (eventDatawrap != null) {
-        break;
-      }
+            partitionManager.close();
+        }
+        stateStore.close();
     }
 
-    if (eventDatawrap != null) {
-      MessageId messageId = eventDatawrap.getMessageId();
-      List<Object> tuples = scheme.deserialize(eventDatawrap.getEventData());
-      if (tuples != null) {
-        collector.emit(tuples, messageId);
-      }
-    }
-    
-    checkpointIfNeeded();
-
-    // We don't need to sleep here because the IPartitionManager.receive() is
-    // a blocked call so it's fine to call this function in a tight loop.
-  }
-
-  @Override
-  public void ack(Object msgId) {
-    MessageId messageId = (MessageId) msgId;
-    IPartitionManager partitionManager = 
partitionCoordinator.getPartitionManager(messageId.getPartitionId());
-    String offset = messageId.getOffset();
-    partitionManager.ack(offset);
-  }
-
-  @Override
-  public void fail(Object msgId) {
-    MessageId messageId = (MessageId) msgId;
-    IPartitionManager partitionManager = 
partitionCoordinator.getPartitionManager(messageId.getPartitionId());
-    String offset = messageId.getOffset();
-    partitionManager.fail(offset);
-  }
-
-  @Override
-  public void deactivate() {
-    // let's checkpoint so that we can get the last checkpoint when restarting.
-    checkpoint();
-  }
-
-  @Override
-  public void close() {
-    for (IPartitionManager partitionManager : 
-      partitionCoordinator.getMyPartitionManagers()) {
-      partitionManager.close();
-    }
-    stateStore.close();
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    if (Strings.isNullOrEmpty(eventHubConfig.getOutputStreamId())) {
-      declarer.declare(scheme.getOutputFields());
-    } else {
-      declarer.declareStream(eventHubConfig.getOutputStreamId(), 
scheme.getOutputFields());
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        if (Strings.isNullOrEmpty(eventHubConfig.getOutputStreamId())) {
+            declarer.declare(scheme.getOutputFields());
+        } else {
+            declarer.declareStream(eventHubConfig.getOutputStreamId(), 
scheme.getOutputFields());
+        }
     }
-  }
 
-  private void checkpointIfNeeded() {
-    long nextCheckpointTime = lastCheckpointTime + checkpointIntervalInSeconds 
* 1000;
-    if (nextCheckpointTime < System.currentTimeMillis()) {
+    private void checkpointIfNeeded() {
+        long nextCheckpointTime = lastCheckpointTime + 
checkpointIntervalInSeconds * 1000;
+        if (nextCheckpointTime < System.currentTimeMillis()) {
 
-      checkpoint();
-      lastCheckpointTime = System.currentTimeMillis();
+            checkpoint();
+            lastCheckpointTime = System.currentTimeMillis();
+        }
     }
-  }
-  
-  private void checkpoint() {
-    for (IPartitionManager partitionManager : 
-      partitionCoordinator.getMyPartitionManagers()) {
-      partitionManager.checkpoint();
+
+    private void checkpoint() {
+        for (IPartitionManager partitionManager :
+            partitionCoordinator.getMyPartitionManagers()) {
+            partitionManager.checkpoint();
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
index 5556970..cd27b11 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
@@ -15,242 +15,241 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import com.microsoft.azure.eventhubs.EventHubClient;
 import com.microsoft.azure.servicebus.ConnectionStringBuilder;
-
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
 public class EventHubSpoutConfig implements Serializable {
-       private static final long serialVersionUID = 1L;
-
-       public static final String EH_SERVICE_FQDN_SUFFIX = 
"servicebus.windows.net";
-       private final String userName;
-       private final String password;
-       private final String namespace;
-       private final String entityPath;
-       private final int partitionCount;
-
-       private String zkConnectionString = null; // if null then use zookeeper 
used
-                                                                               
                // by Storm
-       private int checkpointIntervalInSeconds = 10;
-       private int receiverCredits = 1024;
-       private int maxPendingMsgsPerPartition = 1024;
-       private long enqueueTimeFilter = 0; // timestamp in millisecond, 0 means
-                                                                               
// disabling filter
-       private String connectionString;
-       private String topologyName;
-       private IEventDataScheme scheme = new StringEventDataScheme();
-       private String consumerGroupName = 
EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
-       private String outputStreamId;
-
-
-       // These are mandatory parameters
-       public EventHubSpoutConfig(String username, String password,
-                       String namespace, String entityPath, int 
partitionCount) {
-               this.userName = username;
-               this.password = password;
-               this.connectionString = new 
ConnectionStringBuilder(namespace,entityPath,
-                               username,password).toString();
-               this.namespace = namespace;
-               this.entityPath = entityPath;
-               this.partitionCount = partitionCount;
-       }
-
-       // Keep this constructor for backward compatibility
-       public EventHubSpoutConfig(String username, String password,
-                       String namespace, String entityPath, int partitionCount,
-                       String zkConnectionString) {
-               this(username, password, namespace, entityPath, partitionCount);
-               setZkConnectionString(zkConnectionString);
-       }
-
-       // Keep this constructor for backward compatibility
-       public EventHubSpoutConfig(String username, String password,
-                       String namespace, String entityPath, int partitionCount,
-                       String zkConnectionString, int 
checkpointIntervalInSeconds,
-                       int receiverCredits) {
-               this(username, password, namespace, entityPath, partitionCount,
-                               zkConnectionString);
-               setCheckpointIntervalInSeconds(checkpointIntervalInSeconds);
-               setReceiverCredits(receiverCredits);
-       }
-
-       public EventHubSpoutConfig(String username, String password,
-                       String namespace, String entityPath, int partitionCount,
-                       String zkConnectionString, int 
checkpointIntervalInSeconds,
-                       int receiverCredits, long enqueueTimeFilter) {
-               this(username, password, namespace, entityPath, partitionCount,
-                               zkConnectionString, checkpointIntervalInSeconds,
-                               receiverCredits);
-               setEnqueueTimeFilter(enqueueTimeFilter);
-       }
-
-       // Keep this constructor for backward compatibility
-       public EventHubSpoutConfig(String username, String password,
-                       String namespace, String entityPath, int partitionCount,
-                       String zkConnectionString, int 
checkpointIntervalInSeconds,
-                       int receiverCredits, int maxPendingMsgsPerPartition,
-                       long enqueueTimeFilter) {
-
-               this(username, password, namespace, entityPath, partitionCount,
-                               zkConnectionString, checkpointIntervalInSeconds,
-                               receiverCredits);
-               setMaxPendingMsgsPerPartition(maxPendingMsgsPerPartition);
-               setEnqueueTimeFilter(enqueueTimeFilter);
-       }
-
-       public String getNamespace() {
-               return namespace;
-       }
-
-       public String getEntityPath() {
-               return entityPath;
-       }
-
-       public int getPartitionCount() {
-               return partitionCount;
-       }
-
-       public String getZkConnectionString() {
-               return zkConnectionString;
-       }
-
-       public void setZkConnectionString(String value) {
-               zkConnectionString = value;
-       }
-
-       public EventHubSpoutConfig withZkConnectionString(String value) {
-               setZkConnectionString(value);
-               return this;
-       }
-
-       public int getCheckpointIntervalInSeconds() {
-               return checkpointIntervalInSeconds;
-       }
-
-       public void setCheckpointIntervalInSeconds(int value) {
-               checkpointIntervalInSeconds = value;
-       }
-
-       public EventHubSpoutConfig withCheckpointIntervalInSeconds(int value) {
-               setCheckpointIntervalInSeconds(value);
-               return this;
-       }
-
-       public int getReceiverCredits() {
-               return receiverCredits;
-       }
-
-       public void setReceiverCredits(int value) {
-               receiverCredits = value;
-       }
-
-       public EventHubSpoutConfig withReceiverCredits(int value) {
-               setReceiverCredits(value);
-               return this;
-       }
-
-       public int getMaxPendingMsgsPerPartition() {
-               return maxPendingMsgsPerPartition;
-       }
-
-       public void setMaxPendingMsgsPerPartition(int value) {
-               maxPendingMsgsPerPartition = value;
-       }
-
-       public EventHubSpoutConfig withMaxPendingMsgsPerPartition(int value) {
-               setMaxPendingMsgsPerPartition(value);
-               return this;
-       }
-
-       public long getEnqueueTimeFilter() {
-               return enqueueTimeFilter;
-       }
-
-       public void setEnqueueTimeFilter(long value) {
-               enqueueTimeFilter = value;
-       }
-
-       public EventHubSpoutConfig withEnqueueTimeFilter(long value) {
-               setEnqueueTimeFilter(value);
-               return this;
-       }
-
-       public String getTopologyName() {
-               return topologyName;
-       }
-
-       public void setTopologyName(String value) {
-               topologyName = value;
-       }
-
-       public EventHubSpoutConfig withTopologyName(String value) {
-               setTopologyName(value);
-               return this;
-       }
-
-       public IEventDataScheme getEventDataScheme() {
-               return scheme;
-       }
-
-       public void setEventDataScheme(IEventDataScheme scheme) {
-               this.scheme = scheme;
-       }
-
-       public EventHubSpoutConfig withEventDataScheme(IEventDataScheme value) {
-               setEventDataScheme(value);
-               return this;
-       }
-
-       public String getConsumerGroupName() {
-               return consumerGroupName;
-       }
-
-       public void setConsumerGroupName(String value) {
-               consumerGroupName = value;
-       }
-
-       public EventHubSpoutConfig withConsumerGroupName(String value) {
-               setConsumerGroupName(value);
-               return this;
-       }
+    public static final String EH_SERVICE_FQDN_SUFFIX = 
"servicebus.windows.net";
+    private static final long serialVersionUID = 1L;
+    private final String userName;
+    private final String password;
+    private final String namespace;
+    private final String entityPath;
+    private final int partitionCount;
+
+    private String zkConnectionString = null; // if null then use zookeeper 
used
+    // by Storm
+    private int checkpointIntervalInSeconds = 10;
+    private int receiverCredits = 1024;
+    private int maxPendingMsgsPerPartition = 1024;
+    private long enqueueTimeFilter = 0; // timestamp in millisecond, 0 means
+    // disabling filter
+    private String connectionString;
+    private String topologyName;
+    private IEventDataScheme scheme = new StringEventDataScheme();
+    private String consumerGroupName = 
EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
+    private String outputStreamId;
+
+
+    // These are mandatory parameters
+    public EventHubSpoutConfig(String username, String password,
+                               String namespace, String entityPath, int 
partitionCount) {
+        this.userName = username;
+        this.password = password;
+        this.connectionString = new ConnectionStringBuilder(namespace, 
entityPath,
+                                                            username, 
password).toString();
+        this.namespace = namespace;
+        this.entityPath = entityPath;
+        this.partitionCount = partitionCount;
+    }
+
+    // Keep this constructor for backward compatibility
+    public EventHubSpoutConfig(String username, String password,
+                               String namespace, String entityPath, int 
partitionCount,
+                               String zkConnectionString) {
+        this(username, password, namespace, entityPath, partitionCount);
+        setZkConnectionString(zkConnectionString);
+    }
+
+    // Keep this constructor for backward compatibility
+    public EventHubSpoutConfig(String username, String password,
+                               String namespace, String entityPath, int 
partitionCount,
+                               String zkConnectionString, int 
checkpointIntervalInSeconds,
+                               int receiverCredits) {
+        this(username, password, namespace, entityPath, partitionCount,
+             zkConnectionString);
+        setCheckpointIntervalInSeconds(checkpointIntervalInSeconds);
+        setReceiverCredits(receiverCredits);
+    }
+
+    public EventHubSpoutConfig(String username, String password,
+                               String namespace, String entityPath, int 
partitionCount,
+                               String zkConnectionString, int 
checkpointIntervalInSeconds,
+                               int receiverCredits, long enqueueTimeFilter) {
+        this(username, password, namespace, entityPath, partitionCount,
+             zkConnectionString, checkpointIntervalInSeconds,
+             receiverCredits);
+        setEnqueueTimeFilter(enqueueTimeFilter);
+    }
+
+    // Keep this constructor for backward compatibility
+    public EventHubSpoutConfig(String username, String password,
+                               String namespace, String entityPath, int 
partitionCount,
+                               String zkConnectionString, int 
checkpointIntervalInSeconds,
+                               int receiverCredits, int 
maxPendingMsgsPerPartition,
+                               long enqueueTimeFilter) {
+
+        this(username, password, namespace, entityPath, partitionCount,
+             zkConnectionString, checkpointIntervalInSeconds,
+             receiverCredits);
+        setMaxPendingMsgsPerPartition(maxPendingMsgsPerPartition);
+        setEnqueueTimeFilter(enqueueTimeFilter);
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public String getEntityPath() {
+        return entityPath;
+    }
+
+    public int getPartitionCount() {
+        return partitionCount;
+    }
+
+    public String getZkConnectionString() {
+        return zkConnectionString;
+    }
+
+    public void setZkConnectionString(String value) {
+        zkConnectionString = value;
+    }
+
+    public EventHubSpoutConfig withZkConnectionString(String value) {
+        setZkConnectionString(value);
+        return this;
+    }
+
+    public int getCheckpointIntervalInSeconds() {
+        return checkpointIntervalInSeconds;
+    }
+
+    public void setCheckpointIntervalInSeconds(int value) {
+        checkpointIntervalInSeconds = value;
+    }
+
+    public EventHubSpoutConfig withCheckpointIntervalInSeconds(int value) {
+        setCheckpointIntervalInSeconds(value);
+        return this;
+    }
+
+    public int getReceiverCredits() {
+        return receiverCredits;
+    }
+
+    public void setReceiverCredits(int value) {
+        receiverCredits = value;
+    }
+
+    public EventHubSpoutConfig withReceiverCredits(int value) {
+        setReceiverCredits(value);
+        return this;
+    }
+
+    public int getMaxPendingMsgsPerPartition() {
+        return maxPendingMsgsPerPartition;
+    }
+
+    public void setMaxPendingMsgsPerPartition(int value) {
+        maxPendingMsgsPerPartition = value;
+    }
+
+    public EventHubSpoutConfig withMaxPendingMsgsPerPartition(int value) {
+        setMaxPendingMsgsPerPartition(value);
+        return this;
+    }
+
+    public long getEnqueueTimeFilter() {
+        return enqueueTimeFilter;
+    }
+
+    public void setEnqueueTimeFilter(long value) {
+        enqueueTimeFilter = value;
+    }
+
+    public EventHubSpoutConfig withEnqueueTimeFilter(long value) {
+        setEnqueueTimeFilter(value);
+        return this;
+    }
+
+    public String getTopologyName() {
+        return topologyName;
+    }
+
+    public void setTopologyName(String value) {
+        topologyName = value;
+    }
+
+    public EventHubSpoutConfig withTopologyName(String value) {
+        setTopologyName(value);
+        return this;
+    }
+
+    public IEventDataScheme getEventDataScheme() {
+        return scheme;
+    }
+
+    public void setEventDataScheme(IEventDataScheme scheme) {
+        this.scheme = scheme;
+    }
+
+    public EventHubSpoutConfig withEventDataScheme(IEventDataScheme value) {
+        setEventDataScheme(value);
+        return this;
+    }
+
+    public String getConsumerGroupName() {
+        return consumerGroupName;
+    }
+
+    public void setConsumerGroupName(String value) {
+        consumerGroupName = value;
+    }
+
+    public EventHubSpoutConfig withConsumerGroupName(String value) {
+        setConsumerGroupName(value);
+        return this;
+    }
 
-       public List<String> getPartitionList() {
-               List<String> partitionList = new ArrayList<String>();
-
-               for (int i = 0; i < this.partitionCount; i++) {
-                       partitionList.add(Integer.toString(i));
-               }
-
-               return partitionList;
-       }
-
-       public String getConnectionString() {
-               return connectionString;
-       }
-
-       /*Keeping it for backward compatibility*/
-       public void setTargetAddress(String targetFqnAddress) {
-       }
-
-       public void setTargetAddress(){
+    public List<String> getPartitionList() {
+        List<String> partitionList = new ArrayList<String>();
+
+        for (int i = 0; i < this.partitionCount; i++) {
+            partitionList.add(Integer.toString(i));
+        }
+
+        return partitionList;
+    }
+
+    public String getConnectionString() {
+        return connectionString;
+    }
+
+    /*Keeping it for backward compatibility*/
+    public void setTargetAddress(String targetFqnAddress) {
+    }
+
+    public void setTargetAddress() {
 
-       }
+    }
 
-       public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) {
-               setTargetAddress(targetFqnAddress);
-               return this;
-       }
-
-       public String getOutputStreamId() {
-               return outputStreamId;
-       }
-
-       public void setOutputStreamId(String outputStreamId) {
-               this.outputStreamId = outputStreamId;
-       }
+    public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) {
+        setTargetAddress(targetFqnAddress);
+        return this;
+    }
+
+    public String getOutputStreamId() {
+        return outputStreamId;
+    }
+
+    public void setOutputStreamId(String outputStreamId) {
+        this.outputStreamId = outputStreamId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
index 88855eb..1baf16f 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
@@ -15,14 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 public class FieldConstants {
 
-  public static final String PartitionKey = "partitionKey";
-  public static final String Offset = "offset";
-  public static final String Message = "message";
-  public static final String META_DATA = "metadata";
-  public static final String SYSTEM_META_DATA = "eventdata_system_properties";
-  public static final String DefaultStartingOffset = "-1";
+    public static final String PartitionKey = "partitionKey";
+    public static final String Offset = "offset";
+    public static final String Message = "message";
+    public static final String META_DATA = "metadata";
+    public static final String SYSTEM_META_DATA = 
"eventdata_system_properties";
+    public static final String DefaultStartingOffset = "-1";
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
index 854da6f..f2aa158 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
@@ -15,30 +15,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import com.microsoft.azure.eventhubs.EventData;
-import org.apache.storm.tuple.Fields;
-
 import java.io.Serializable;
 import java.util.List;
+import org.apache.storm.tuple.Fields;
 
 public interface IEventDataScheme extends Serializable {
 
-  /**
-   * Deserialize an AMQP Message into a Tuple.
-   *
-   * @see #getOutputFields() for the list of fields the tuple will contain.
-   *
-   * @param eventData The EventData to Deserialize.
-   * @return A tuple containing the deserialized fields of the message.
-   */
-  List<Object> deserialize(EventData eventData);
+    /**
+     * Deserialize an AMQP Message into a Tuple.
+     *
+     * @param eventData The EventData to Deserialize.
+     * @return A tuple containing the deserialized fields of the message.
+     *
+     * @see #getOutputFields() for the list of fields the tuple will contain.
+     */
+    List<Object> deserialize(EventData eventData);
 
-  /**
-   * Retrieve the Fields that are present on tuples created by this object.
-   *
-   * @return The Fields that are present on tuples created by this object.
-   */
-  Fields getOutputFields();
+    /**
+     * Retrieve the Fields that are present on tuples created by this object.
+     *
+     * @return The Fields that are present on tuples created by this object.
+     */
+    Fields getOutputFields();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
index c8de8bc..4af967f 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
@@ -15,19 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.eventhubs.spout;
 
 import java.util.Map;
 
 public interface IEventHubReceiver {
 
-  void open(IEventFilter filter) throws EventHubException;
+    void open(IEventFilter filter) throws EventHubException;
 
-  void close();
+    void close();
 
-  boolean isOpen();
+    boolean isOpen();
 
-  EventDataWrap receive();
+    EventDataWrap receive();
 
-  Map<String, Object> getMetricsData();
+    Map<String, Object> getMetricsData();
 }

Reply via email to