Remove client adding reference eventhubs-client package and use 
ResilientEventHubReceiver

Signed-off-by: Shanyu Zhao <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85aeb3d4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85aeb3d4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85aeb3d4

Branch: refs/heads/master
Commit: 85aeb3d48efcaa28d6fc5dbfe6ce87af9f3e2615
Parents: 1f13f15
Author: Shanyu Zhao <[email protected]>
Authored: Sun May 17 01:14:01 2015 -0700
Committer: Shanyu Zhao <[email protected]>
Committed: Sun May 17 01:14:01 2015 -0700

----------------------------------------------------------------------
 external/storm-eventhubs/pom.xml                |  13 +-
 .../storm/eventhubs/bolt/EventHubBolt.java      |   6 +-
 .../eventhubs/bolt/EventHubBoltConfig.java      |   4 +-
 .../client/ConnectionStringBuilder.java         | 116 ----------------
 .../storm/eventhubs/client/Constants.java       |  32 -----
 .../storm/eventhubs/client/EventHubClient.java  |  95 -------------
 .../eventhubs/client/EventHubConsumerGroup.java |  72 ----------
 .../eventhubs/client/EventHubException.java     |  37 -----
 .../eventhubs/client/EventHubReceiver.java      | 139 -------------------
 .../eventhubs/client/EventHubSendClient.java    |  70 ----------
 .../storm/eventhubs/client/EventHubSender.java  |  99 -------------
 .../storm/eventhubs/client/SelectorFilter.java  |  38 -----
 .../eventhubs/client/SelectorFilterWriter.java  |  64 ---------
 .../eventhubs/spout/EventHubReceiverFilter.java |  56 --------
 .../eventhubs/spout/EventHubReceiverImpl.java   |  50 +++----
 .../eventhubs/spout/EventHubSpoutConfig.java    |  31 +----
 .../eventhubs/spout/IEventHubReceiver.java      |   5 +-
 .../spout/IEventHubReceiverFilter.java          |  35 -----
 .../eventhubs/spout/SimplePartitionManager.java |  11 +-
 .../spout/StaticPartitionCoordinator.java       |   2 +-
 .../TransactionalTridentEventHubEmitter.java    |   2 +-
 .../trident/TridentPartitionManager.java        |  12 +-
 .../eventhubs/spout/EventHubReceiverMock.java   |  18 +--
 23 files changed, 56 insertions(+), 951 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
index 2dfb739..6d4a47b 100755
--- a/external/storm-eventhubs/pom.xml
+++ b/external/storm-eventhubs/pom.xml
@@ -33,7 +33,7 @@
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <qpid.version>0.32</qpid.version>
+        <eventhubs.client.version>0.9</eventhubs.client.version>
     </properties>
     <build>
         <plugins>
@@ -77,14 +77,9 @@
     </build>
     <dependencies>
         <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-client</artifactId>
-            <version>${qpid.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-amqp-1-0-client-jms</artifactId>
-            <version>${qpid.version}</version>
+            <groupId>com.microsoft.eventhubs.client</groupId>
+            <artifactId>eventhubs-client</artifactId>
+            <version>${eventhubs.client.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
index a817744..9acf7fa 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
@@ -22,9 +22,9 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.storm.eventhubs.client.EventHubClient;
-import org.apache.storm.eventhubs.client.EventHubException;
-import org.apache.storm.eventhubs.client.EventHubSender;
+import com.microsoft.eventhubs.client.EventHubClient;
+import com.microsoft.eventhubs.client.EventHubException;
+import com.microsoft.eventhubs.client.EventHubSender;
 
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
index 4383a72..10b4e39 100644
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java
@@ -20,6 +20,7 @@ package org.apache.storm.eventhubs.bolt;
 import java.io.Serializable;
 
 import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
+import com.microsoft.eventhubs.client.ConnectionStringBuilder;
 
 /*
  * EventHubs bolt configurations
@@ -80,7 +81,8 @@ public class EventHubBoltConfig implements Serializable {
   public EventHubBoltConfig(String userName, String password, String namespace,
       String targetFqnAddress, String entityPath, boolean partitionMode,
       IEventDataFormat dataFormat) {
-    this.connectionString = 
EventHubSpoutConfig.buildConnectionString(userName, password, namespace, 
targetFqnAddress);
+    this.connectionString = new ConnectionStringBuilder(userName, password,
+               namespace, targetFqnAddress).getConnectionString();
     this.entityPath = entityPath;
     this.partitionMode = partitionMode;
     this.dataFormat = dataFormat;

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java
deleted file mode 100755
index 518c88d..0000000
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/ConnectionStringBuilder.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- 
*******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLConnection;
-import java.net.URLDecoder;
-import java.net.URLStreamHandler;
-
-public class ConnectionStringBuilder {
-
-  private final String connectionString;
-
-  private String host;
-  private int port;
-  private String userName;
-  private String password;
-  private boolean ssl;
-
-  // amqps://[username]:[password]@[namespace].servicebus.windows.net/
-  public ConnectionStringBuilder(String connectionString) throws 
EventHubException {
-    this.connectionString = connectionString;
-    this.initialize();
-  }
-
-  public String getHost() {
-    return this.host;
-  }
-
-  public void setHost(String value) {
-    this.host = value;
-  }
-
-  public int getPort() {
-    return this.port;
-  }
-
-  public void setPort(int value) {
-    this.port = value;
-  }
-
-  public String getUserName() {
-    return this.userName;
-  }
-
-  public void setUserName(String value) {
-    this.userName = value;
-  }
-
-  public String getPassword() {
-    return this.password;
-  }
-
-  public void setPassword(String value) {
-    this.password = value;
-  }
-
-  public boolean getSsl() {
-    return this.ssl;
-  }
-
-  public void setSsl(boolean value) {
-    this.ssl = value;
-  }
-
-  private void initialize() throws EventHubException {
-
-    URL url;
-    try {
-      url = new URL(null, this.connectionString, new NullURLStreamHandler());
-    } catch (MalformedURLException e) {
-      throw new EventHubException("connectionString is not valid.", e);
-    }
-
-    String protocol = url.getProtocol();
-    this.ssl = protocol.equalsIgnoreCase(Constants.SslScheme);
-    this.host = url.getHost();
-    this.port = url.getPort();
-
-    if (this.port == -1) {
-      this.port = this.ssl ? Constants.DefaultSslPort : Constants.DefaultPort;
-    }
-
-    String userInfo = url.getUserInfo();
-    if (userInfo != null) {
-      String[] credentials = userInfo.split(":", 2);
-      this.userName = URLDecoder.decode(credentials[0]);
-      this.password = URLDecoder.decode(credentials[1]);
-    }
-  }
-
-  class NullURLStreamHandler extends URLStreamHandler {
-
-    @Override
-    protected URLConnection openConnection(URL u) throws IOException {
-      throw new UnsupportedOperationException("Not supported yet.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java
deleted file mode 100755
index d87ad53..0000000
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/Constants.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- 
*******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-public class Constants {
-
-  public static final String DefaultStartingOffset = "-1";
-  public static final String SelectorFilterName = 
"apache.org:selector-filter:string";
-  public static final String OffsetFilterFormatString = 
"amqp.annotation.x-opt-offset > '%s'";
-  public static final String EnqueueTimeFilterFormatString = 
"amqp.annotation.x-opt-enqueuedtimeutc > %d";
-  public static final String ConsumerAddressFormatString = 
"%s/ConsumerGroups/%s/Partitions/%s";
-  public static final String DestinationAddressFormatString = 
"%s/Partitions/%s";
-
-  public static final String SslScheme = "amqps";
-  public static final int DefaultPort = 5672;
-  public static final int DefaultSslPort = 5671;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
deleted file mode 100755
index 564a26f..0000000
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubClient.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- 
*******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-import org.apache.qpid.amqp_1_0.client.Connection;
-import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
-import org.apache.qpid.amqp_1_0.client.ConnectionException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EventHubClient {
-
-  private static final String DefaultConsumerGroupName = "$default";
-  private static final Logger logger = 
LoggerFactory.getLogger(EventHubClient.class);
-  private static final long ConnectionSyncTimeout = 60000L;
-
-  private final String connectionString;
-  private final String entityPath;
-  private final Connection connection;
-
-  private EventHubClient(String connectionString, String entityPath) throws 
EventHubException {
-    this.connectionString = connectionString;
-    this.entityPath = entityPath;
-    this.connection = this.createConnection();
-  }
-
-  /**
-   * creates a new instance of EventHubClient using the supplied connection 
string and entity path.
-   *
-   * @param connectionString connection string to the namespace of event hubs. 
connection string format:
-   * amqps://{userId}:{password}@{namespaceName}.servicebus.windows.net
-   * @param entityPath the name of event hub entity.
-   *
-   * @return EventHubClient
-   * @throws org.apache.storm.eventhubs.client.EventHubException
-   */
-  public static EventHubClient create(String connectionString, String 
entityPath) throws EventHubException {
-    return new EventHubClient(connectionString, entityPath);
-  }
-
-  public EventHubSender createPartitionSender(String partitionId) throws 
Exception {
-    return new EventHubSender(this.connection.createSession(), 
this.entityPath, partitionId);
-  }
-
-  public EventHubConsumerGroup getConsumerGroup(String cgName) {
-    if(cgName == null || cgName.length() == 0) {
-      cgName = DefaultConsumerGroupName;
-    }
-    return new EventHubConsumerGroup(connection, entityPath, cgName);
-  }
-
-  public void close() {
-    try {
-      this.connection.close();
-    } catch (ConnectionErrorException e) {
-      logger.error(e.toString());
-    }
-  }
-
-  private Connection createConnection() throws EventHubException {
-    ConnectionStringBuilder connectionStringBuilder = new 
ConnectionStringBuilder(this.connectionString);
-    Connection clientConnection;
-
-    try {
-      clientConnection = new Connection(
-        connectionStringBuilder.getHost(),
-        connectionStringBuilder.getPort(),
-        connectionStringBuilder.getUserName(),
-        connectionStringBuilder.getPassword(),
-        connectionStringBuilder.getHost(),
-        connectionStringBuilder.getSsl());
-    } catch (ConnectionException e) {
-      logger.error(e.toString());
-      throw new EventHubException(e);
-    }
-    clientConnection.getEndpoint().setSyncTimeout(ConnectionSyncTimeout);
-    
SelectorFilterWriter.register(clientConnection.getEndpoint().getDescribedTypeRegistry());
-    return clientConnection;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java
deleted file mode 100755
index 892ff9c..0000000
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubConsumerGroup.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- 
*******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-import org.apache.qpid.amqp_1_0.client.Connection;
-import org.apache.qpid.amqp_1_0.client.ConnectionException;
-import org.apache.qpid.amqp_1_0.client.Session;
-
-public class EventHubConsumerGroup {
-
-  private final Connection connection;
-  private final String entityPath;
-  private final String consumerGroupName;
-
-  private Session session;
-
-  public EventHubConsumerGroup(Connection connection, String entityPath, 
String consumerGroupName) {
-    this.connection = connection;
-    this.entityPath = entityPath;
-    this.consumerGroupName = consumerGroupName;
-  }
-
-  public EventHubReceiver createReceiver(String partitionId, String 
startingOffset, int defaultCredits) throws EventHubException {
-    this.ensureSessionCreated();
-
-    if (startingOffset == null || startingOffset.equals("")) {
-      startingOffset = Constants.DefaultStartingOffset;
-    }
-
-    String filterStr = String.format(Constants.OffsetFilterFormatString, 
startingOffset);
-    return new EventHubReceiver(this.session, this.entityPath, 
this.consumerGroupName, partitionId, filterStr, defaultCredits);
-  }
-  
-  public EventHubReceiver createReceiver(String partitionId, long timeAfter, 
int defaultCredits) throws EventHubException {
-    this.ensureSessionCreated();
-
-    String filterStr = String.format(Constants.EnqueueTimeFilterFormatString, 
timeAfter);
-    return new EventHubReceiver(this.session, this.entityPath, 
this.consumerGroupName, partitionId, filterStr, defaultCredits);
-  }
-
-  public void close() {
-    if (this.session != null) {
-      this.session.close();
-    }
-  }
-
-  synchronized void ensureSessionCreated() throws EventHubException {
-
-    try {
-      if (this.session == null) {
-        this.session = this.connection.createSession();
-      }
-    } catch (ConnectionException e) {
-      throw new EventHubException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java
deleted file mode 100755
index 3e94573..0000000
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- 
*******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-public class EventHubException extends Exception {
-
-  public EventHubException() {
-    super();
-  }
-
-  public EventHubException(String message) {
-    super(message);
-  }
-
-  public EventHubException(Throwable cause) {
-    super(cause);
-  }
-
-  public EventHubException(String message, Throwable cause) {
-    super(message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java
deleted file mode 100755
index c8900a8..0000000
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubReceiver.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- 
*******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-import java.util.Collections;
-import java.util.Map;
-import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
-import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.client.Receiver;
-import org.apache.qpid.amqp_1_0.client.Session;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class EventHubReceiver {
-
-  private static final Logger logger = LoggerFactory
-      .getLogger(EventHubReceiver.class);
-  private static final String linkName = "eventhubs-receiver-link";
-
-  private final Session session;
-  private final String entityPath;
-  private final String consumerGroupName;
-  private final String partitionId;
-  private final String consumerAddress;
-  private final Map<Symbol, Filter> filters;
-  private final int defaultCredits;
-
-  private Receiver receiver;
-  private boolean isClosed;
-
-  public EventHubReceiver(Session session, String entityPath,
-      String consumerGroupName, String partitionId, String filterStr, int 
defaultCredits)
-      throws EventHubException {
-
-    this.session = session;
-    this.entityPath = entityPath;
-    this.consumerGroupName = consumerGroupName;
-    this.partitionId = partitionId;
-    this.consumerAddress = this.getConsumerAddress();
-    this.filters = Collections.singletonMap(
-        Symbol.valueOf(Constants.SelectorFilterName),
-        (Filter) new SelectorFilter(filterStr));
-    logger.info("receiver filter string: " + filterStr);
-    this.defaultCredits = defaultCredits;
-
-    this.ensureReceiverCreated();
-  }
-
-  // receive without timeout means wait until a message is delivered.
-  public Message receive() {
-    return this.receive(-1L);
-  }
-
-  public Message receive(long waitTimeInMilliseconds) {
-
-    this.checkIfClosed();
-
-    Message message = this.receiver.receive(waitTimeInMilliseconds);
-
-    if (message != null) {
-      // Let's acknowledge a message although EH service doesn't need it
-      // to avoid AMQP flow issue.
-      receiver.acknowledge(message);
-
-      return message;
-    } else {
-      this.checkError();
-    }
-
-    return null;
-  }
-
-  public void close() {
-    if (!isClosed) {
-      receiver.close();
-      isClosed = true;
-    }
-  }
-
-  private String getConsumerAddress() {
-    return String.format(Constants.ConsumerAddressFormatString,
-        entityPath, consumerGroupName, partitionId);
-  }
-
-  private void ensureReceiverCreated() throws EventHubException {
-    try {
-      logger.info("defaultCredits: " + defaultCredits);
-      receiver = session.createReceiver(consumerAddress,
-          AcknowledgeMode.ALO, linkName, false, filters, null);
-      receiver.setCredit(UnsignedInteger.valueOf(defaultCredits), true);
-    } catch (ConnectionErrorException e) {
-      // caller (EventHubSpout) will log the error
-      throw new EventHubException(e);
-    }
-  }
-
-  private void checkError() {
-    org.apache.qpid.amqp_1_0.type.transport.Error error = 
this.receiver.getError();
-    if (error != null) {
-      String errorMessage = error.toString();
-      logger.error(errorMessage);
-      this.close();
-
-      throw new RuntimeException(errorMessage);
-    } else {
-      // adding a sleep here to avoid any potential tight-loop issue.
-      try {
-        Thread.sleep(10);
-      } catch (InterruptedException e) {
-        logger.error(e.toString());
-      }
-    }
-  }
-  
-  private void checkIfClosed() {
-    if (this.isClosed) {
-      throw new RuntimeException("receiver was closed.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java
deleted file mode 100755
index ad31cc1..0000000
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSendClient.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- 
*******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
-
-public class EventHubSendClient {
-  
-  public static void main(String[] args) throws Exception {
-    
-    if (args == null || args.length < 7) {
-      throw new IllegalArgumentException(
-        "arguments are missing. [username] [password] [namespace] [entityPath] 
[partitionId] [messageSize] [messageCount] are required.");
-    }
-    
-    String username = args[0];
-    String password = args[1];
-    String namespace = args[2];
-    String entityPath = args[3];
-    String partitionId = args[4];
-    int messageSize = Integer.parseInt(args[5]);
-    int messageCount = Integer.parseInt(args[6]);
-    assert(messageSize > 0);
-    assert(messageCount > 0);
-    
-    if (partitionId.equals("-1")) {
-      // -1 means we want to send data to partitions in round-robin fashion.
-      partitionId = null;
-    }
-    
-    try {
-      String connectionString = 
EventHubSpoutConfig.buildConnectionString(username, password, namespace);
-      EventHubClient client = EventHubClient.create(connectionString, 
entityPath);
-      EventHubSender sender = client.createPartitionSender(partitionId);
-      
-      StringBuilder sb = new StringBuilder(messageSize);
-      for(int i=1; i<messageCount+1; ++i) {
-        while(sb.length() < messageSize) {
-          sb.append(" current message: " + i);
-        }
-        sb.setLength(messageSize);
-        sender.send(sb.toString());
-        sb.setLength(0);
-        if(i % 1000 == 0) {
-          System.out.println("Number of messages sent: " + i);
-        }
-      }
-      System.out.println("Total Number of messages sent: " + messageCount);
-    } catch (Exception e) {
-      System.out.println("Exception: " + e.getMessage());
-    }
-    
-    System.out.println("done");
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
deleted file mode 100755
index 435893e..0000000
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/EventHubSender.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- 
*******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-import java.util.concurrent.TimeoutException;
-import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.client.Sender;
-import org.apache.qpid.amqp_1_0.client.Session;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EventHubSender {
-
-  private static final Logger logger = 
LoggerFactory.getLogger(EventHubSender.class);
-
-  private final Session session;
-  private final String entityPath;
-  private final String partitionId;
-  private final String destinationAddress;
-
-  private Sender sender;
-
-  public EventHubSender(Session session, String entityPath, String 
partitionId) {
-    this.session = session;
-    this.entityPath = entityPath;
-    this.partitionId = partitionId;
-    this.destinationAddress = this.getDestinationAddress();
-  }
-  
-  public void send(byte[] data) throws EventHubException {
-    try {
-      if (this.sender == null) {
-        this.ensureSenderCreated();
-      }
-
-      Binary bin = new Binary(data);
-      Message message = new Message(new Data(bin));
-      this.sender.send(message);
-
-    } catch (LinkDetachedException e) {
-      logger.error(e.getMessage());
-
-      EventHubException eventHubException = new EventHubException("Sender has 
been closed");
-      throw eventHubException;
-    } catch (TimeoutException e) {
-      logger.error(e.getMessage());
-
-      EventHubException eventHubException = new EventHubException("Timed out 
while waiting to get credit to send");
-      throw eventHubException;
-    } catch (Exception e) {
-      logger.error(e.getMessage());
-    }
-  }
-
-  public void send(String data) throws EventHubException {
-    //For interop with other language, convert string to bytes
-    send(data.getBytes());
-  }
-
-  public void close() {
-    try {
-      this.sender.close();
-    } catch (Sender.SenderClosingException e) {
-      logger.error("Closing a sender encountered error: " + e.getMessage());
-    }
-  }
-
-  private String getDestinationAddress() {
-    if (this.partitionId == null || this.partitionId.equals("")) {
-      return this.entityPath;
-    } else {
-      return String.format(Constants.DestinationAddressFormatString, 
this.entityPath, this.partitionId);
-    }
-  }
-
-  private synchronized void ensureSenderCreated() throws Exception {
-    if (this.sender == null) {
-      this.sender = this.session.createSender(this.destinationAddress);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java
deleted file mode 100755
index 7869cce..0000000
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- 
*******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-
-public class SelectorFilter implements Filter {
-
-  private final String value;
-
-  public SelectorFilter(String value) {
-    this.value = value;
-  }
-
-  public String getValue() {
-    return value;
-  }
-
-  @Override
-  public String toString() {
-    return value;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java
deleted file mode 100755
index 102b6b6..0000000
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/client/SelectorFilterWriter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- 
*******************************************************************************/
-package org.apache.storm.eventhubs.client;
-
-import org.apache.qpid.amqp_1_0.codec.AbstractDescribedTypeWriter;
-import org.apache.qpid.amqp_1_0.codec.ValueWriter;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-
-public class SelectorFilterWriter extends
-  AbstractDescribedTypeWriter<SelectorFilter> {
-
-  private static final ValueWriter.Factory<SelectorFilter> FACTORY = new 
ValueWriter.Factory<SelectorFilter>() {
-
-    @Override
-    public ValueWriter<SelectorFilter> newInstance(ValueWriter.Registry 
registry) {
-      return new SelectorFilterWriter(registry);
-    }
-  };
-
-  private SelectorFilter value;
-
-  public SelectorFilterWriter(final ValueWriter.Registry registry) {
-    super(registry);
-  }
-
-  public static void register(ValueWriter.Registry registry) {
-    registry.register(SelectorFilter.class, FACTORY);
-  }
-
-  @Override
-  protected void onSetValue(final SelectorFilter value) {
-    this.value = value;
-  }
-
-  @Override
-  protected void clear() {
-    value = null;
-  }
-
-  @Override
-  protected Object getDescriptor() {
-    return UnsignedLong.valueOf(0x00000137000000AL);
-  }
-
-  @Override
-  protected ValueWriter<String> createDescribedWriter() {
-    return getRegistry().getValueWriter(value.getValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java
deleted file mode 100755
index e80cd25..0000000
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverFilter.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- 
*******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-
-public class EventHubReceiverFilter implements IEventHubReceiverFilter {
-  String offset = null;
-  long enqueueTime = 0;
-  public EventHubReceiverFilter() {
-    
-  }
-  
-  public EventHubReceiverFilter(String offset) {
-    //Creates offset only filter
-    this.offset = offset;
-  }
-  
-  public EventHubReceiverFilter(long enqueueTime) {
-    //Creates enqueue time only filter
-    this.enqueueTime = enqueueTime;
-  }
-  
-  public void setOffset(String offset) {
-    this.offset = offset;
-  }
-  
-  public void setEnqueueTime(long enqueueTime) {
-    this.enqueueTime = enqueueTime;
-  }
-  
-  @Override
-  public String getOffset() {
-    return offset;
-  }
-
-  @Override
-  public long getEnqueueTime() {
-    return enqueueTime;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
index 68302af..7454af4 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
@@ -25,10 +25,10 @@ import backtype.storm.metric.api.CountMetric;
 import backtype.storm.metric.api.MeanReducer;
 import backtype.storm.metric.api.ReducedMetric;
 
-import org.apache.storm.eventhubs.client.Constants;
-import org.apache.storm.eventhubs.client.EventHubClient;
-import org.apache.storm.eventhubs.client.EventHubException;
-import org.apache.storm.eventhubs.client.EventHubReceiver;
+import com.microsoft.eventhubs.client.Constants;
+import com.microsoft.eventhubs.client.EventHubException;
+import com.microsoft.eventhubs.client.IEventHubFilter;
+import com.microsoft.eventhubs.client.ResilientEventHubReceiver;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -39,8 +39,8 @@ import 
org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
 
 public class EventHubReceiverImpl implements IEventHubReceiver {
   private static final Logger logger = 
LoggerFactory.getLogger(EventHubReceiverImpl.class);
-  private static final Symbol OffsetKey = Symbol.valueOf("x-opt-offset");
-  private static final Symbol SequenceNumberKey = 
Symbol.valueOf("x-opt-sequence-number");
+  private static final Symbol OffsetKey = Symbol.valueOf(Constants.OffsetKey);
+  private static final Symbol SequenceNumberKey = 
Symbol.valueOf(Constants.SequenceNumberKey);
 
   private final String connectionString;
   private final String entityName;
@@ -48,8 +48,7 @@ public class EventHubReceiverImpl implements 
IEventHubReceiver {
   private final int defaultCredits;
   private final String consumerGroupName;
 
-  private EventHubReceiver receiver;
-  private String lastOffset = null;
+  private ResilientEventHubReceiver receiver;
   private ReducedMetric receiveApiLatencyMean;
   private CountMetric receiveApiCallCount;
   private CountMetric receiveMessageCount;
@@ -66,27 +65,13 @@ public class EventHubReceiverImpl implements 
IEventHubReceiver {
   }
 
   @Override
-  public void open(IEventHubReceiverFilter filter) throws EventHubException {
-    logger.info("creating eventhub receiver: partitionId=" + partitionId + ", 
offset=" + filter.getOffset()
-        + ", enqueueTime=" + filter.getEnqueueTime());
+  public void open(IEventHubFilter filter) throws EventHubException {
+    logger.info("creating eventhub receiver: partitionId=" + partitionId + 
+               ", filterString=" + filter.getFilterString());
     long start = System.currentTimeMillis();
-    EventHubClient eventHubClient = EventHubClient.create(connectionString, 
entityName);
-    if(filter.getOffset() != null) {
-      receiver = eventHubClient
-          .getConsumerGroup(consumerGroupName)
-          .createReceiver(partitionId, filter.getOffset(), defaultCredits);
-    }
-    else if(filter.getEnqueueTime() != 0) {
-      receiver = eventHubClient
-          .getConsumerGroup(consumerGroupName)
-          .createReceiver(partitionId, filter.getEnqueueTime(), 
defaultCredits);
-    }
-    else {
-      logger.error("Invalid IEventHubReceiverFilter, use default offset as 
filter");
-      receiver = eventHubClient
-          .getConsumerGroup(consumerGroupName)
-          .createReceiver(partitionId, Constants.DefaultStartingOffset, 
defaultCredits);
-    }
+    receiver = new ResilientEventHubReceiver(connectionString, entityName,
+               partitionId, consumerGroupName, defaultCredits, filter);
+    
     long end = System.currentTimeMillis();
     logger.info("created eventhub receiver, time taken(ms): " + (end-start));
   }
@@ -113,21 +98,20 @@ public class EventHubReceiverImpl implements 
IEventHubReceiver {
     long millis = (end - start);
     receiveApiLatencyMean.update(millis);
     receiveApiCallCount.incr();
-
+    
     if (message == null) {
       //Temporary workaround for AMQP/EH bug of failing to receive messages
-      if(timeoutInMilliseconds > 100 && millis < timeoutInMilliseconds/2) {
+      /*if(timeoutInMilliseconds > 100 && millis < timeoutInMilliseconds/2) {
         throw new RuntimeException(
             "Restart EventHubSpout due to failure of receiving messages in "
             + millis + " millisecond");
-      }
+      }*/
       return null;
     }
+
     receiveMessageCount.incr();
 
-    //logger.info(String.format("received a message. PartitionId: %s, Offset: 
%s", partitionId, this.lastOffset));
     MessageId messageId = createMessageId(message);
-
     return EventData.create(message, messageId);
   }
   

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
index 0238e40..77cd998 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
@@ -18,10 +18,9 @@
 package org.apache.storm.eventhubs.spout;
 
 import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.List;
+import com.microsoft.eventhubs.client.ConnectionStringBuilder;
 
 public class EventHubSpoutConfig implements Serializable {
   private static final long serialVersionUID = 1L; 
@@ -48,7 +47,8 @@ public class EventHubSpoutConfig implements Serializable {
       String entityPath, int partitionCount) {
     this.userName = username;
     this.password = password;
-    this.connectionString = buildConnectionString(username, password, 
namespace);
+    this.connectionString = new ConnectionStringBuilder(username, password,
+               namespace).getConnectionString();
     this.namespace = namespace;
     this.entityPath = entityPath;
     this.partitionCount = partitionCount;
@@ -173,28 +173,7 @@ public class EventHubSpoutConfig implements Serializable {
   }
 
   public void setTargetAddress(String targetFqnAddress) {
-    this.connectionString = buildConnectionString(
-        userName, password, namespace, targetFqnAddress);
+    this.connectionString = new ConnectionStringBuilder(userName, password,
+               namespace, targetFqnAddress).getConnectionString();
   }
-
-  public static String buildConnectionString(String username, String password, 
String namespace) {
-    return buildConnectionString(username, password, namespace, 
EH_SERVICE_FQDN_SUFFIX);
-  }
-
-  public static String buildConnectionString(String username, String password,
-      String namespace, String targetFqnAddress) {
-    return "amqps://" + username + ":" + encodeString(password)
-        + "@" + namespace + "." + targetFqnAddress;
-  }    
-
-  private static String encodeString(String input) {
-    try {
-      return URLEncoder.encode(input, "UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      //We don't need to throw this exception because the exception won't
-      //happen because of user input. Our unit tests will catch this error.
-      return "";
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
index 45e9e57..bc2db14 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java
@@ -19,11 +19,12 @@ package org.apache.storm.eventhubs.spout;
 
 import java.util.Map;
 
-import org.apache.storm.eventhubs.client.EventHubException;
+import com.microsoft.eventhubs.client.EventHubException;
+import com.microsoft.eventhubs.client.IEventHubFilter;
 
 public interface IEventHubReceiver {
 
-  void open(IEventHubReceiverFilter filter) throws EventHubException;
+  void open(IEventHubFilter filter) throws EventHubException;
 
   void close();
   

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java
deleted file mode 100755
index e5b93cf..0000000
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiverFilter.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- 
*******************************************************************************/
-package org.apache.storm.eventhubs.spout;
-
-/**
- * The filter to create an EventHubs receiver
- */
-public interface IEventHubReceiverFilter {
-  /**
-   * Get offset to filter events based on offset 
-   * @return null if offset not set
-   */
-  String getOffset();
-  
-  /**
-   * Get timestamp to filter events based on enqueue time.
-   * @return 0 if enqueue time is not set
-   */
-  long getEnqueueTime();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
index bcbcbac..b66a785 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java
@@ -22,7 +22,10 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.storm.eventhubs.client.Constants;
+import com.microsoft.eventhubs.client.Constants;
+import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter;
+import com.microsoft.eventhubs.client.EventHubOffsetFilter;
+import com.microsoft.eventhubs.client.IEventHubFilter;
 
 /**
  * A simple partition manager that does not re-send failed messages
@@ -62,13 +65,13 @@ public class SimplePartitionManager implements 
IPartitionManager {
       offset = Constants.DefaultStartingOffset;
     }
 
-    EventHubReceiverFilter filter = new EventHubReceiverFilter();
+    IEventHubFilter filter;
     if (offset.equals(Constants.DefaultStartingOffset)
         && config.getEnqueueTimeFilter() != 0) {
-      filter.setEnqueueTime(config.getEnqueueTimeFilter());
+      filter = new EventHubEnqueueTimeFilter(config.getEnqueueTimeFilter());
     }
     else {
-      filter.setOffset(offset);
+      filter = new EventHubOffsetFilter(offset);
     }
 
     receiver.open(filter);

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
index 3f5f156..8d2c485 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StaticPartitionCoordinator.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.storm.eventhubs.client.Constants;
+import com.microsoft.eventhubs.client.Constants;
 
 public class StaticPartitionCoordinator implements IPartitionCoordinator {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
index 2b92c3c..bf7f339 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java
@@ -29,7 +29,7 @@ import org.apache.storm.eventhubs.spout.EventHubReceiverImpl;
 import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
 import org.apache.storm.eventhubs.spout.IEventHubReceiver;
 import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
-import org.apache.storm.eventhubs.client.Constants;
+import com.microsoft.eventhubs.client.Constants;
 
 import storm.trident.operation.TridentCollector;
 import storm.trident.spout.IOpaquePartitionedTridentSpout;

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
index 60391c3..159fe41 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java
@@ -23,10 +23,12 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.storm.eventhubs.client.Constants;
-import org.apache.storm.eventhubs.client.EventHubException;
+import com.microsoft.eventhubs.client.Constants;
+import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter;
+import com.microsoft.eventhubs.client.EventHubException;
+import com.microsoft.eventhubs.client.EventHubOffsetFilter;
+
 import org.apache.storm.eventhubs.spout.EventData;
-import org.apache.storm.eventhubs.spout.EventHubReceiverFilter;
 import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
 import org.apache.storm.eventhubs.spout.IEventHubReceiver;
 
@@ -47,10 +49,10 @@ public class TridentPartitionManager implements 
ITridentPartitionManager {
     try {
       if((offset == null || offset.equals(Constants.DefaultStartingOffset)) 
         && spoutConfig.getEnqueueTimeFilter() != 0) {
-          receiver.open(new 
EventHubReceiverFilter(spoutConfig.getEnqueueTimeFilter()));
+          receiver.open(new 
EventHubEnqueueTimeFilter(spoutConfig.getEnqueueTimeFilter()));
       }
       else {
-        receiver.open(new EventHubReceiverFilter(offset));
+        receiver.open(new EventHubOffsetFilter(offset));
       }
       lastOffset = offset;
       return true;

http://git-wip-us.apache.org/repos/asf/storm/blob/85aeb3d4/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
index 740ef63..b176598 100755
--- 
a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
+++ 
b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java
@@ -24,14 +24,15 @@ import java.util.Map;
 import org.apache.storm.eventhubs.spout.MessageId;
 import org.apache.storm.eventhubs.spout.EventData;
 import org.apache.storm.eventhubs.spout.IEventHubReceiver;
-
 import org.apache.qpid.amqp_1_0.client.Message;
 import org.apache.qpid.amqp_1_0.jms.impl.TextMessageImpl;
 import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.Section;
 import org.apache.qpid.amqp_1_0.type.messaging.Data;
 
-import org.apache.storm.eventhubs.client.EventHubException;
+import com.microsoft.eventhubs.client.EventHubException;
+import com.microsoft.eventhubs.client.EventHubOffsetFilter;
+import com.microsoft.eventhubs.client.IEventHubFilter;
 
 /**
  * A mock receiver that emits fake data with offset starting from given offset
@@ -58,17 +59,8 @@ public class EventHubReceiverMock implements 
IEventHubReceiver {
   }
 
   @Override
-  public void open(IEventHubReceiverFilter filter) throws EventHubException {
-    if(filter.getOffset() != null) {
-      currentOffset = Long.parseLong(filter.getOffset());
-    }
-    else if(filter.getEnqueueTime() != 0) {
-      //assume if it's time based filter the offset matches the enqueue time.
-      currentOffset = filter.getEnqueueTime();
-    }
-    else {
-      throw new EventHubException("Invalid IEventHubReceiverFilter");
-    }
+  public void open(IEventHubFilter filter) throws EventHubException {
+    currentOffset = Long.parseLong(filter.getFilterValue());
     isOpen = true;
   }
 

Reply via email to