Jira 2127- Storm-eventhubs should use latest amqp and eventhubs-client versions


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c8f7a531
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c8f7a531
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c8f7a531

Branch: refs/heads/master
Commit: c8f7a531bb43e82146758a80ec1f1174d1f8ff3b
Parents: cd5c9e8
Author: Ravi Peri <ravip...@microsoft.com>
Authored: Tue Sep 27 14:06:16 2016 -0700
Committer: Ravi Peri <ravip...@microsoft.com>
Committed: Mon Oct 17 10:49:08 2016 -0700

----------------------------------------------------------------------
 external/storm-eventhubs/pom.xml                |  57 +--
 .../storm/eventhubs/bolt/EventHubBolt.java      | 111 +++---
 .../eventhubs/spout/BinaryEventDataScheme.java  |  66 ++++
 .../storm/eventhubs/spout/EventDataScheme.java  |  72 ++--
 .../storm/eventhubs/spout/EventHubSpout.java    |   8 +-
 .../eventhubs/spout/EventHubSpoutConfig.java    | 369 +++++++++++--------
 .../storm/eventhubs/spout/FieldConstants.java   |   1 +
 .../storm/eventhubs/spout/IEventDataScheme.java |  13 +
 .../eventhubs/spout/StringEventDataScheme.java  |  69 ++++
 pom.xml                                         |   2 +
 10 files changed, 507 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
index 195b7fd..452c971 100755
--- a/external/storm-eventhubs/pom.xml
+++ b/external/storm-eventhubs/pom.xml
@@ -17,24 +17,20 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
-    
+
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
         <version>2.0.0-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
-    
+
     <artifactId>storm-eventhubs</artifactId>
     <version>2.0.0-SNAPSHOT</version>
     <packaging>jar</packaging>
     <name>storm-eventhubs</name>
     <description>EventHubs Storm Spout</description>
 
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <eventhubs.client.version>0.9.1</eventhubs.client.version>
-    </properties>
     <build>
         <plugins>
             <plugin>
@@ -55,23 +51,23 @@
                     </transformers>
                     
<outputFile>target/${project.artifactId}-${project.version}-jar-with-dependencies.jar</outputFile>
                 </configuration>
-               </plugin>
+            </plugin>
             <plugin>
-                       <artifactId>maven-antrun-plugin</artifactId>
-                       <executions>
-                         <execution>
-                           <phase>package</phase>
-                           <configuration>
-                             <tasks>
-                               <copy 
file="src/main/resources/config.properties" 
tofile="target/eventhubs-config.properties"/>
-                    </tasks>
-                           </configuration>
-                           <goals>
-                             <goal>run</goal>
-                           </goals>
-                         </execution>
-                       </executions>
-               </plugin>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <configuration>
+                            <tasks>
+                                <copy 
file="src/main/resources/config.properties" 
tofile="target/eventhubs-config.properties" />
+                            </tasks>
+                        </configuration>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
     <dependencies>
@@ -104,10 +100,25 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-amqp-1-0-client</artifactId>
+            <version>${qpid.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-amqp-1-0-client-jms</artifactId>
+            <version>${qpid.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-amqp-1-0-common</artifactId>
+            <version>${qpid.version}</version>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>4.11</version>
             <scope>test</scope>
         </dependency>
-    </dependencies> 
+    </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
index ac5018b..3d64cc5 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
@@ -36,66 +36,65 @@ import org.apache.storm.tuple.Tuple;
  * A bolt that writes event message to EventHub.
  */
 public class EventHubBolt extends BaseRichBolt {
-  private static final long serialVersionUID = 1L;
-  private static final Logger logger = LoggerFactory
-      .getLogger(EventHubBolt.class);
-  
-  protected OutputCollector collector;
-  protected EventHubSender sender;
-  protected EventHubBoltConfig boltConfig;
-  
-  
-  public EventHubBolt(String connectionString, String entityPath) {
-    boltConfig = new EventHubBoltConfig(connectionString, entityPath);
-  }
+       private static final long serialVersionUID = 1L;
+       private static final Logger logger = LoggerFactory
+                       .getLogger(EventHubBolt.class);
 
-  public EventHubBolt(String userName, String password, String namespace,
-      String entityPath, boolean partitionMode) {
-    boltConfig = new EventHubBoltConfig(userName, password, namespace,
-        entityPath, partitionMode);
-  }
-  
-  public EventHubBolt(EventHubBoltConfig config) {
-    boltConfig = config;
-  }
+       protected OutputCollector collector;
+       protected EventHubSender sender;
+       protected EventHubBoltConfig boltConfig;
 
-  @Override
-  public void prepare(Map config, TopologyContext context, OutputCollector 
collector) {
-    this.collector = collector;
-    String myPartitionId = null;
-    if(boltConfig.getPartitionMode()) {
-      //We can use the task index (starting from 0) as the partition ID
-      myPartitionId = "" + context.getThisTaskIndex();
-    }
-    logger.info("creating sender: " + boltConfig.getConnectionString()
-        + ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
-    try {
-      EventHubClient eventHubClient = EventHubClient.create(
-          boltConfig.getConnectionString(), boltConfig.getEntityPath());
-      sender = eventHubClient.createPartitionSender(myPartitionId);
-    }
-    catch(Exception ex) {
-      logger.error(ex.getMessage());
-      throw new RuntimeException(ex);
-    }
+       public EventHubBolt(String connectionString, String entityPath) {
+               boltConfig = new EventHubBoltConfig(connectionString, 
entityPath);
+       }
 
-  }
+       public EventHubBolt(String userName, String password, String namespace,
+                       String entityPath, boolean partitionMode) {
+               boltConfig = new EventHubBoltConfig(userName, password, 
namespace,
+                               entityPath, partitionMode);
+       }
 
-  @Override
-  public void execute(Tuple tuple) {
-    try {
-      sender.send(boltConfig.getEventDataFormat().serialize(tuple));
-      collector.ack(tuple);
-    }
-    catch(EventHubException ex) {
-      logger.error(ex.getMessage());
-      collector.fail(tuple);
-    }
-  }
+       public EventHubBolt(EventHubBoltConfig config) {
+               boltConfig = config;
+       }
 
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    
-  }
+       @Override
+       public void prepare(Map config, TopologyContext context,
+                       OutputCollector collector) {
+               this.collector = collector;
+               String myPartitionId = null;
+               if (boltConfig.getPartitionMode()) {
+                       // We can use the task index (starting from 0) as the 
partition ID
+                       myPartitionId = "" + context.getThisTaskIndex();
+               }
+               logger.info("creating sender: " + 
boltConfig.getConnectionString()
+                               + ", " + boltConfig.getEntityPath() + ", " + 
myPartitionId);
+               try {
+                       EventHubClient eventHubClient = EventHubClient.create(
+                                       boltConfig.getConnectionString(),
+                                       boltConfig.getEntityPath());
+                       sender = 
eventHubClient.createPartitionSender(myPartitionId);
+               } catch (Exception ex) {
+                       collector.reportError(ex);
+                       throw new RuntimeException(ex);
+               }
+
+       }
+
+       @Override
+       public void execute(Tuple tuple) {
+               try {
+                       
sender.send(boltConfig.getEventDataFormat().serialize(tuple));
+                       collector.ack(tuple);
+               } catch (EventHubException ex) {
+                       collector.reportError(ex);
+                       collector.fail(tuple);
+               }
+       }
+
+       @Override
+       public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+       }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
new file mode 100644
index 0000000..7b0d7e5
--- /dev/null
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
*******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An Event Data Scheme which deserializes message payload into the raw bytes.
+ *
+ * The resulting tuple would contain two items, the first being the message
+ * bytes, and the second a map of properties that include metadata, which can 
be
+ * used to determine who processes the message, and how it is processed.
+ */
+public class BinaryEventDataScheme implements IEventDataScheme {
+
+       @Override
+       public List<Object> deserialize(Message message) {
+               final List<Object> fieldContents = new ArrayList<Object>();
+
+               Map metaDataMap = new HashMap();
+               byte[] messageData = new byte[0];
+
+               for (Section section : message.getPayload()) {
+                       if (section instanceof Data) {
+                               Data data = (Data) section;
+                               messageData = data.getValue().getArray();
+                       } else if (section instanceof ApplicationProperties) {
+                               final ApplicationProperties 
applicationProperties = (ApplicationProperties) section;
+                               metaDataMap = applicationProperties.getValue();
+                       }
+               }
+
+               fieldContents.add(messageData);
+               fieldContents.add(metaDataMap);
+               return fieldContents;
+       }
+
+       @Override
+       public Fields getOutputFields() {
+               return new Fields(FieldConstants.Message, 
FieldConstants.META_DATA);
+       }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
index 0e275a5..90cad0a 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
@@ -19,37 +19,59 @@ package org.apache.storm.eventhubs.spout;
 
 import org.apache.storm.tuple.Fields;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
 import org.apache.qpid.amqp_1_0.client.Message;
 import org.apache.qpid.amqp_1_0.type.Section;
 import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
 import org.apache.qpid.amqp_1_0.type.messaging.Data;
 
+/**
+ * An Event Data Scheme which deserializes message payload into the Strings. No
+ * encoding is assumed. The receiver will need to handle parsing of the string
+ * data in appropriate encoding.
+ *
+ * The resulting tuple would contain two items: the the message string, and a
+ * map of properties that include metadata, which can be used to determine who
+ * processes the message, and how it is processed.
+ * 
+ * For passing the raw bytes of a messsage to Bolts, refer to
+ * {@link BinaryEventDataScheme}.
+ */
 public class EventDataScheme implements IEventDataScheme {
 
-  private static final long serialVersionUID = 1L;
-
-  @Override
-  public List<Object> deserialize(Message message) {
-    List<Object> fieldContents = new ArrayList<Object>();
-
-    for (Section section : message.getPayload()) {
-      if (section instanceof Data) {
-        Data data = (Data) section;
-        fieldContents.add(new String(data.getValue().getArray()));
-        return fieldContents;
-      } else if (section instanceof AmqpValue) {
-        AmqpValue amqpValue = (AmqpValue) section;
-        fieldContents.add(amqpValue.getValue().toString());
-        return fieldContents;
-      }
-    }
-
-    return null;
-  }
-
-  @Override
-  public Fields getOutputFields() {
-    return new Fields(FieldConstants.Message);
-  }
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public List<Object> deserialize(Message message) {
+               final List<Object> fieldContents = new ArrayList<Object>();
+
+               Map metaDataMap = new HashMap();
+               String messageData = "";
+
+               for (Section section : message.getPayload()) {
+                       if (section instanceof Data) {
+                               Data data = (Data) section;
+                               messageData = new 
String(data.getValue().getArray());
+                       } else if (section instanceof AmqpValue) {
+                               AmqpValue amqpValue = (AmqpValue) section;
+                               messageData = amqpValue.getValue().toString();
+                       } else if (section instanceof ApplicationProperties) {
+                               final ApplicationProperties 
applicationProperties = (ApplicationProperties) section;
+                               metaDataMap = applicationProperties.getValue();
+                       }
+               }
+
+               fieldContents.add(messageData);
+               fieldContents.add(metaDataMap);
+               return fieldContents;
+       }
+
+       @Override
+       public Fields getOutputFields() {
+               return new Fields(FieldConstants.Message, 
FieldConstants.META_DATA);
+       }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
index ff40315..662697d 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
@@ -121,8 +121,8 @@ public class EventHubSpout extends BaseRichSpout {
         zkEndpointAddress = sb.toString();
       }
       stateStore = new ZookeeperStateStore(zkEndpointAddress,
-          (Integer)config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES),
-          (Integer)config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL));
+          
Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()),
+          
Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL).toString()));
     }
     stateStore.open();
 
@@ -152,7 +152,7 @@ public class EventHubSpout extends BaseRichSpout {
     try {
       preparePartitions(config, totalTasks, taskIndex, collector);
     } catch (Exception e) {
-      logger.error(e.getMessage());
+         collector.reportError(e);
       throw new RuntimeException(e);
     }
     
@@ -167,7 +167,7 @@ public class EventHubSpout extends BaseRichSpout {
           }
           return concatMetricsDataMaps;
       }
-    }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
+    }, 
Integer.parseInt(config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS).toString()));
     logger.info("end open()");
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
index 77cd998..168b134 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
@@ -23,157 +23,220 @@ import java.util.List;
 import com.microsoft.eventhubs.client.ConnectionStringBuilder;
 
 public class EventHubSpoutConfig implements Serializable {
-  private static final long serialVersionUID = 1L; 
-
-  public static final String EH_SERVICE_FQDN_SUFFIX = "servicebus.windows.net";
-  private final String userName;
-  private final String password;
-  private final String namespace;
-  private final String entityPath;
-  private final int partitionCount;
-
-  private String zkConnectionString = null; //if null then use zookeeper used 
by Storm
-  private int checkpointIntervalInSeconds = 10;
-  private int receiverCredits = 1024;
-  private int maxPendingMsgsPerPartition = 1024;
-  private long enqueueTimeFilter = 0; //timestamp in millisecond, 0 means 
disabling filter
-  private String connectionString;
-  private String topologyName;
-  private IEventDataScheme scheme = new EventDataScheme();
-  private String consumerGroupName = null; //if null then use default consumer 
group
-
-  //These are mandatory parameters
-  public EventHubSpoutConfig(String username, String password, String 
namespace,
-      String entityPath, int partitionCount) {
-    this.userName = username;
-    this.password = password;
-    this.connectionString = new ConnectionStringBuilder(username, password,
-               namespace).getConnectionString();
-    this.namespace = namespace;
-    this.entityPath = entityPath;
-    this.partitionCount = partitionCount;
-  }
-
-  //Keep this constructor for backward compatibility
-  public EventHubSpoutConfig(String username, String password, String 
namespace,
-      String entityPath, int partitionCount, String zkConnectionString) {
-    this(username, password, namespace, entityPath, partitionCount);
-    setZkConnectionString(zkConnectionString);
-  }
-  
-  //Keep this constructor for backward compatibility
-  public EventHubSpoutConfig(String username, String password, String 
namespace,
-      String entityPath, int partitionCount, String zkConnectionString,
-      int checkpointIntervalInSeconds, int receiverCredits) {
-    this(username, password, namespace, entityPath, partitionCount,
-        zkConnectionString);
-    setCheckpointIntervalInSeconds(checkpointIntervalInSeconds);
-    setReceiverCredits(receiverCredits);
-  }
-
-  //Keep this constructor for backward compatibility
-  public EventHubSpoutConfig(String username, String password, String 
namespace,
-    String entityPath, int partitionCount, String zkConnectionString,
-    int checkpointIntervalInSeconds, int receiverCredits, int 
maxPendingMsgsPerPartition, long enqueueTimeFilter) {
-    
-    this(username, password, namespace, entityPath, partitionCount,
-        zkConnectionString, checkpointIntervalInSeconds, receiverCredits);
-    setMaxPendingMsgsPerPartition(maxPendingMsgsPerPartition);
-    setEnqueueTimeFilter(enqueueTimeFilter);
-  }
-
-  public String getNamespace() {
-    return namespace;
-  }
-
-  public String getEntityPath() {
-    return entityPath;
-  }
-
-  public int getPartitionCount() {
-    return partitionCount;
-  }
-
-  public String getZkConnectionString() {
-    return zkConnectionString;
-  }
-
-  public void setZkConnectionString(String value) {
-    zkConnectionString = value;
-  }
-
-  public int getCheckpointIntervalInSeconds() {
-    return checkpointIntervalInSeconds;
-  }
-
-  public void setCheckpointIntervalInSeconds(int value) {
-    checkpointIntervalInSeconds = value;
-  }
-  
-  public int getReceiverCredits() {
-    return receiverCredits;
-  }
-
-  public void setReceiverCredits(int value) {
-    receiverCredits = value;
-  }
-  
-  public int getMaxPendingMsgsPerPartition() {
-    return maxPendingMsgsPerPartition;
-  }
-
-  public void setMaxPendingMsgsPerPartition(int value) {
-    maxPendingMsgsPerPartition = value;
-  }
-  
-  public long getEnqueueTimeFilter() {
-    return enqueueTimeFilter;
-  }
-
-  public void setEnqueueTimeFilter(long value) {
-    enqueueTimeFilter = value;
-  }
-
-  public String getTopologyName() {
-    return topologyName;
-  }
-
-  public void setTopologyName(String value) {
-    topologyName = value;
-  }
-
-  public IEventDataScheme getEventDataScheme() {
-    return scheme;
-  }
-
-  public void setEventDataScheme(IEventDataScheme scheme) {
-    this.scheme = scheme;
-  }
-
-  public String getConsumerGroupName() {
-    return consumerGroupName;
-  }
-
-  public void setConsumerGroupName(String value) {
-    consumerGroupName = value;
-  }
-
-  public List<String> getPartitionList() {
-    List<String> partitionList = new ArrayList<String>();
-
-    for (int i = 0; i < this.partitionCount; i++) {
-      partitionList.add(Integer.toString(i));
-    }
-
-    return partitionList;
-  }
-
-  public String getConnectionString() {
-    return connectionString;
-  }
-
-  public void setTargetAddress(String targetFqnAddress) {
-    this.connectionString = new ConnectionStringBuilder(userName, password,
-               namespace, targetFqnAddress).getConnectionString();
-  }
+       private static final long serialVersionUID = 1L;
+
+       public static final String EH_SERVICE_FQDN_SUFFIX = 
"servicebus.windows.net";
+       private final String userName;
+       private final String password;
+       private final String namespace;
+       private final String entityPath;
+       private final int partitionCount;
+
+       private String zkConnectionString = null; // if null then use zookeeper 
used
+                                                                               
                // by Storm
+       private int checkpointIntervalInSeconds = 10;
+       private int receiverCredits = 1024;
+       private int maxPendingMsgsPerPartition = 1024;
+       private long enqueueTimeFilter = 0; // timestamp in millisecond, 0 means
+                                                                               
// disabling filter
+       private String connectionString;
+       private String topologyName;
+       private IEventDataScheme scheme = new StringEventDataScheme();
+       private String consumerGroupName = null; // if null then use default
+                                                                               
                // consumer group
+
+       // These are mandatory parameters
+       public EventHubSpoutConfig(String username, String password,
+                       String namespace, String entityPath, int 
partitionCount) {
+               this.userName = username;
+               this.password = password;
+               this.connectionString = new ConnectionStringBuilder(username, 
password,
+                               namespace).getConnectionString();
+               this.namespace = namespace;
+               this.entityPath = entityPath;
+               this.partitionCount = partitionCount;
+       }
+
+       // Keep this constructor for backward compatibility
+       public EventHubSpoutConfig(String username, String password,
+                       String namespace, String entityPath, int partitionCount,
+                       String zkConnectionString) {
+               this(username, password, namespace, entityPath, partitionCount);
+               setZkConnectionString(zkConnectionString);
+       }
+
+       // Keep this constructor for backward compatibility
+       public EventHubSpoutConfig(String username, String password,
+                       String namespace, String entityPath, int partitionCount,
+                       String zkConnectionString, int 
checkpointIntervalInSeconds,
+                       int receiverCredits) {
+               this(username, password, namespace, entityPath, partitionCount,
+                               zkConnectionString);
+               setCheckpointIntervalInSeconds(checkpointIntervalInSeconds);
+               setReceiverCredits(receiverCredits);
+       }
+
+       public EventHubSpoutConfig(String username, String password,
+                       String namespace, String entityPath, int partitionCount,
+                       String zkConnectionString, int 
checkpointIntervalInSeconds,
+                       int receiverCredits, long enqueueTimeFilter) {
+               this(username, password, namespace, entityPath, partitionCount,
+                               zkConnectionString, checkpointIntervalInSeconds,
+                               receiverCredits);
+               setEnqueueTimeFilter(enqueueTimeFilter);
+       }
+
+       // Keep this constructor for backward compatibility
+       public EventHubSpoutConfig(String username, String password,
+                       String namespace, String entityPath, int partitionCount,
+                       String zkConnectionString, int 
checkpointIntervalInSeconds,
+                       int receiverCredits, int maxPendingMsgsPerPartition,
+                       long enqueueTimeFilter) {
+
+               this(username, password, namespace, entityPath, partitionCount,
+                               zkConnectionString, checkpointIntervalInSeconds,
+                               receiverCredits);
+               setMaxPendingMsgsPerPartition(maxPendingMsgsPerPartition);
+               setEnqueueTimeFilter(enqueueTimeFilter);
+       }
+
+       public String getNamespace() {
+               return namespace;
+       }
+
+       public String getEntityPath() {
+               return entityPath;
+       }
+
+       public int getPartitionCount() {
+               return partitionCount;
+       }
+
+       public String getZkConnectionString() {
+               return zkConnectionString;
+       }
+
+       public void setZkConnectionString(String value) {
+               zkConnectionString = value;
+       }
+
+       public EventHubSpoutConfig withZkConnectionString(String value) {
+               setZkConnectionString(value);
+               return this;
+       }
+
+       public int getCheckpointIntervalInSeconds() {
+               return checkpointIntervalInSeconds;
+       }
+
+       public void setCheckpointIntervalInSeconds(int value) {
+               checkpointIntervalInSeconds = value;
+       }
+
+       public EventHubSpoutConfig withCheckpointIntervalInSeconds(int value) {
+               setCheckpointIntervalInSeconds(value);
+               return this;
+       }
+
+       public int getReceiverCredits() {
+               return receiverCredits;
+       }
+
+       public void setReceiverCredits(int value) {
+               receiverCredits = value;
+       }
+
+       public EventHubSpoutConfig withReceiverCredits(int value) {
+               setReceiverCredits(value);
+               return this;
+       }
+
+       public int getMaxPendingMsgsPerPartition() {
+               return maxPendingMsgsPerPartition;
+       }
+
+       public void setMaxPendingMsgsPerPartition(int value) {
+               maxPendingMsgsPerPartition = value;
+       }
+
+       public EventHubSpoutConfig withMaxPendingMsgsPerPartition(int value) {
+               setMaxPendingMsgsPerPartition(value);
+               return this;
+       }
+
+       public long getEnqueueTimeFilter() {
+               return enqueueTimeFilter;
+       }
+
+       public void setEnqueueTimeFilter(long value) {
+               enqueueTimeFilter = value;
+       }
+
+       public EventHubSpoutConfig withEnqueueTimeFilter(long value) {
+               setEnqueueTimeFilter(value);
+               return this;
+       }
+
+       public String getTopologyName() {
+               return topologyName;
+       }
+
+       public void setTopologyName(String value) {
+               topologyName = value;
+       }
+
+       public EventHubSpoutConfig withTopologyName(String value) {
+               setTopologyName(value);
+               return this;
+       }
+
+       public IEventDataScheme getEventDataScheme() {
+               return scheme;
+       }
+
+       public void setEventDataScheme(IEventDataScheme scheme) {
+               this.scheme = scheme;
+       }
+
+       public EventHubSpoutConfig withEventDataScheme(IEventDataScheme value) {
+               setEventDataScheme(value);
+               return this;
+       }
+
+       public String getConsumerGroupName() {
+               return consumerGroupName;
+       }
+
+       public void setConsumerGroupName(String value) {
+               consumerGroupName = value;
+       }
+
+       public EventHubSpoutConfig withConsumerGroupName(String value) {
+               setConsumerGroupName(value);
+               return this;
+       }
+
+       public List<String> getPartitionList() {
+               List<String> partitionList = new ArrayList<String>();
+
+               for (int i = 0; i < this.partitionCount; i++) {
+                       partitionList.add(Integer.toString(i));
+               }
+
+               return partitionList;
+       }
+
+       public String getConnectionString() {
+               return connectionString;
+       }
+
+       public void setTargetAddress(String targetFqnAddress) {
+               this.connectionString = new ConnectionStringBuilder(userName, 
password,
+                               namespace, 
targetFqnAddress).getConnectionString();
+       }
+
+       public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) {
+               setTargetAddress(targetFqnAddress);
+               return this;
+       }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
index bd655d6..b238391 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
@@ -22,4 +22,5 @@ public class FieldConstants {
   public static final String PartitionKey = "partitionKey";
   public static final String Offset = "offset";
   public static final String Message = "message";
+  public static final String META_DATA = "metadata";
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
index b7e03b4..b8101b9 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
@@ -24,7 +24,20 @@ import org.apache.qpid.amqp_1_0.client.Message;
 
 public interface IEventDataScheme extends Serializable {
 
+  /**
+   * Deserialize an AMQP Message into a Tuple.
+   *
+   * @see #getOutputFields() for the list of fields the tuple will contain.
+   *
+   * @param message The Message to Deserialize.
+   * @return A tuple containing the deserialized fields of the message.
+   */
   List<Object> deserialize(Message message);
 
+  /**
+   * Retrieve the Fields that are present on tuples created by this object.
+   *
+   * @return The Fields that are present on tuples created by this object.
+   */
   Fields getOutputFields();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
new file mode 100644
index 0000000..0c6f8b6
--- /dev/null
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
*******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * An Event Data Scheme which deserializes message payload into the Strings.
+ * No encoding is assumed. The receiver will need to handle parsing of the 
+ * string data in appropriate encoding.
+ *
+ * Note: Unlike other schemes provided, this scheme does not include any 
+ * metadata. 
+ * 
+ * For metadata please refer to {@link BinaryEventDataScheme}, {@link 
EventDataScheme} 
+ */
+public class StringEventDataScheme implements IEventDataScheme {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public List<Object> deserialize(Message message) {
+    final List<Object> fieldContents = new ArrayList<Object>();
+
+    for (Section section : message.getPayload()) {
+      if (section instanceof Data) {
+        Data data = (Data) section;
+        fieldContents.add(new String(data.getValue().getArray()));
+      } else if (section instanceof AmqpValue) {
+        AmqpValue amqpValue = (AmqpValue) section;
+        fieldContents.add(amqpValue.getValue().toString());
+      }
+    }
+    
+    return fieldContents;
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return new Fields(FieldConstants.Message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5c47f75..c0985a4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -285,6 +285,8 @@
         <aetherVersion>1.0.0.v20140518</aetherVersion>
         <mavenVersion>3.1.0</mavenVersion>
         <wagonVersion>1.0</wagonVersion>
+        <qpid.version>0.32</qpid.version>
+        <eventhubs.client.version>1.0.1</eventhubs.client.version>
     </properties>
 
     <modules>

Reply via email to