Jira 2127- Storm-eventhubs should use latest amqp and eventhubs-client versions
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c8f7a531 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c8f7a531 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c8f7a531 Branch: refs/heads/master Commit: c8f7a531bb43e82146758a80ec1f1174d1f8ff3b Parents: cd5c9e8 Author: Ravi Peri <ravip...@microsoft.com> Authored: Tue Sep 27 14:06:16 2016 -0700 Committer: Ravi Peri <ravip...@microsoft.com> Committed: Mon Oct 17 10:49:08 2016 -0700 ---------------------------------------------------------------------- external/storm-eventhubs/pom.xml | 57 +-- .../storm/eventhubs/bolt/EventHubBolt.java | 111 +++--- .../eventhubs/spout/BinaryEventDataScheme.java | 66 ++++ .../storm/eventhubs/spout/EventDataScheme.java | 72 ++-- .../storm/eventhubs/spout/EventHubSpout.java | 8 +- .../eventhubs/spout/EventHubSpoutConfig.java | 369 +++++++++++-------- .../storm/eventhubs/spout/FieldConstants.java | 1 + .../storm/eventhubs/spout/IEventDataScheme.java | 13 + .../eventhubs/spout/StringEventDataScheme.java | 69 ++++ pom.xml | 2 + 10 files changed, 507 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml index 195b7fd..452c971 100755 --- a/external/storm-eventhubs/pom.xml +++ b/external/storm-eventhubs/pom.xml @@ -17,24 +17,20 @@ --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - + <parent> <artifactId>storm</artifactId> <groupId>org.apache.storm</groupId> <version>2.0.0-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent> - + <artifactId>storm-eventhubs</artifactId> <version>2.0.0-SNAPSHOT</version> <packaging>jar</packaging> <name>storm-eventhubs</name> <description>EventHubs Storm Spout</description> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <eventhubs.client.version>0.9.1</eventhubs.client.version> - </properties> <build> <plugins> <plugin> @@ -55,23 +51,23 @@ </transformers> <outputFile>target/${project.artifactId}-${project.version}-jar-with-dependencies.jar</outputFile> </configuration> - </plugin> + </plugin> <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <executions> - <execution> - <phase>package</phase> - <configuration> - <tasks> - <copy file="src/main/resources/config.properties" tofile="target/eventhubs-config.properties"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <configuration> + <tasks> + <copy file="src/main/resources/config.properties" tofile="target/eventhubs-config.properties" /> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> <dependencies> @@ -104,10 +100,25 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-amqp-1-0-client</artifactId> + <version>${qpid.version}</version> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-amqp-1-0-client-jms</artifactId> + <version>${qpid.version}</version> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-amqp-1-0-common</artifactId> + <version>${qpid.version}</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> - </dependencies> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java index ac5018b..3d64cc5 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java @@ -36,66 +36,65 @@ import org.apache.storm.tuple.Tuple; * A bolt that writes event message to EventHub. */ public class EventHubBolt extends BaseRichBolt { - private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory - .getLogger(EventHubBolt.class); - - protected OutputCollector collector; - protected EventHubSender sender; - protected EventHubBoltConfig boltConfig; - - - public EventHubBolt(String connectionString, String entityPath) { - boltConfig = new EventHubBoltConfig(connectionString, entityPath); - } + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory + .getLogger(EventHubBolt.class); - public EventHubBolt(String userName, String password, String namespace, - String entityPath, boolean partitionMode) { - boltConfig = new EventHubBoltConfig(userName, password, namespace, - entityPath, partitionMode); - } - - public EventHubBolt(EventHubBoltConfig config) { - boltConfig = config; - } + protected OutputCollector collector; + protected EventHubSender sender; + protected EventHubBoltConfig boltConfig; - @Override - public void prepare(Map config, TopologyContext context, OutputCollector collector) { - this.collector = collector; - String myPartitionId = null; - if(boltConfig.getPartitionMode()) { - //We can use the task index (starting from 0) as the partition ID - myPartitionId = "" + context.getThisTaskIndex(); - } - logger.info("creating sender: " + boltConfig.getConnectionString() - + ", " + boltConfig.getEntityPath() + ", " + myPartitionId); - try { - EventHubClient eventHubClient = EventHubClient.create( - boltConfig.getConnectionString(), boltConfig.getEntityPath()); - sender = eventHubClient.createPartitionSender(myPartitionId); - } - catch(Exception ex) { - logger.error(ex.getMessage()); - throw new RuntimeException(ex); - } + public EventHubBolt(String connectionString, String entityPath) { + boltConfig = new EventHubBoltConfig(connectionString, entityPath); + } - } + public EventHubBolt(String userName, String password, String namespace, + String entityPath, boolean partitionMode) { + boltConfig = new EventHubBoltConfig(userName, password, namespace, + entityPath, partitionMode); + } - @Override - public void execute(Tuple tuple) { - try { - sender.send(boltConfig.getEventDataFormat().serialize(tuple)); - collector.ack(tuple); - } - catch(EventHubException ex) { - logger.error(ex.getMessage()); - collector.fail(tuple); - } - } + public EventHubBolt(EventHubBoltConfig config) { + boltConfig = config; + } - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } + @Override + public void prepare(Map config, TopologyContext context, + OutputCollector collector) { + this.collector = collector; + String myPartitionId = null; + if (boltConfig.getPartitionMode()) { + // We can use the task index (starting from 0) as the partition ID + myPartitionId = "" + context.getThisTaskIndex(); + } + logger.info("creating sender: " + boltConfig.getConnectionString() + + ", " + boltConfig.getEntityPath() + ", " + myPartitionId); + try { + EventHubClient eventHubClient = EventHubClient.create( + boltConfig.getConnectionString(), + boltConfig.getEntityPath()); + sender = eventHubClient.createPartitionSender(myPartitionId); + } catch (Exception ex) { + collector.reportError(ex); + throw new RuntimeException(ex); + } + + } + + @Override + public void execute(Tuple tuple) { + try { + sender.send(boltConfig.getEventDataFormat().serialize(tuple)); + collector.ack(tuple); + } catch (EventHubException ex) { + collector.reportError(ex); + collector.fail(tuple); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } } http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java new file mode 100644 index 0000000..7b0d7e5 --- /dev/null +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java @@ -0,0 +1,66 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.storm.eventhubs.spout; + +import org.apache.qpid.amqp_1_0.client.Message; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.storm.tuple.Fields; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * An Event Data Scheme which deserializes message payload into the raw bytes. + * + * The resulting tuple would contain two items, the first being the message + * bytes, and the second a map of properties that include metadata, which can be + * used to determine who processes the message, and how it is processed. + */ +public class BinaryEventDataScheme implements IEventDataScheme { + + @Override + public List<Object> deserialize(Message message) { + final List<Object> fieldContents = new ArrayList<Object>(); + + Map metaDataMap = new HashMap(); + byte[] messageData = new byte[0]; + + for (Section section : message.getPayload()) { + if (section instanceof Data) { + Data data = (Data) section; + messageData = data.getValue().getArray(); + } else if (section instanceof ApplicationProperties) { + final ApplicationProperties applicationProperties = (ApplicationProperties) section; + metaDataMap = applicationProperties.getValue(); + } + } + + fieldContents.add(messageData); + fieldContents.add(metaDataMap); + return fieldContents; + } + + @Override + public Fields getOutputFields() { + return new Fields(FieldConstants.Message, FieldConstants.META_DATA); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java index 0e275a5..90cad0a 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java @@ -19,37 +19,59 @@ package org.apache.storm.eventhubs.spout; import org.apache.storm.tuple.Fields; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; + import org.apache.qpid.amqp_1_0.client.Message; import org.apache.qpid.amqp_1_0.type.Section; import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; import org.apache.qpid.amqp_1_0.type.messaging.Data; +/** + * An Event Data Scheme which deserializes message payload into the Strings. No + * encoding is assumed. The receiver will need to handle parsing of the string + * data in appropriate encoding. + * + * The resulting tuple would contain two items: the the message string, and a + * map of properties that include metadata, which can be used to determine who + * processes the message, and how it is processed. + * + * For passing the raw bytes of a messsage to Bolts, refer to + * {@link BinaryEventDataScheme}. + */ public class EventDataScheme implements IEventDataScheme { - private static final long serialVersionUID = 1L; - - @Override - public List<Object> deserialize(Message message) { - List<Object> fieldContents = new ArrayList<Object>(); - - for (Section section : message.getPayload()) { - if (section instanceof Data) { - Data data = (Data) section; - fieldContents.add(new String(data.getValue().getArray())); - return fieldContents; - } else if (section instanceof AmqpValue) { - AmqpValue amqpValue = (AmqpValue) section; - fieldContents.add(amqpValue.getValue().toString()); - return fieldContents; - } - } - - return null; - } - - @Override - public Fields getOutputFields() { - return new Fields(FieldConstants.Message); - } + private static final long serialVersionUID = 1L; + + @Override + public List<Object> deserialize(Message message) { + final List<Object> fieldContents = new ArrayList<Object>(); + + Map metaDataMap = new HashMap(); + String messageData = ""; + + for (Section section : message.getPayload()) { + if (section instanceof Data) { + Data data = (Data) section; + messageData = new String(data.getValue().getArray()); + } else if (section instanceof AmqpValue) { + AmqpValue amqpValue = (AmqpValue) section; + messageData = amqpValue.getValue().toString(); + } else if (section instanceof ApplicationProperties) { + final ApplicationProperties applicationProperties = (ApplicationProperties) section; + metaDataMap = applicationProperties.getValue(); + } + } + + fieldContents.add(messageData); + fieldContents.add(metaDataMap); + return fieldContents; + } + + @Override + public Fields getOutputFields() { + return new Fields(FieldConstants.Message, FieldConstants.META_DATA); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java index ff40315..662697d 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java @@ -121,8 +121,8 @@ public class EventHubSpout extends BaseRichSpout { zkEndpointAddress = sb.toString(); } stateStore = new ZookeeperStateStore(zkEndpointAddress, - (Integer)config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES), - (Integer)config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)); + Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()), + Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL).toString())); } stateStore.open(); @@ -152,7 +152,7 @@ public class EventHubSpout extends BaseRichSpout { try { preparePartitions(config, totalTasks, taskIndex, collector); } catch (Exception e) { - logger.error(e.getMessage()); + collector.reportError(e); throw new RuntimeException(e); } @@ -167,7 +167,7 @@ public class EventHubSpout extends BaseRichSpout { } return concatMetricsDataMaps; } - }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)); + }, Integer.parseInt(config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS).toString())); logger.info("end open()"); } http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java index 77cd998..168b134 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java @@ -23,157 +23,220 @@ import java.util.List; import com.microsoft.eventhubs.client.ConnectionStringBuilder; public class EventHubSpoutConfig implements Serializable { - private static final long serialVersionUID = 1L; - - public static final String EH_SERVICE_FQDN_SUFFIX = "servicebus.windows.net"; - private final String userName; - private final String password; - private final String namespace; - private final String entityPath; - private final int partitionCount; - - private String zkConnectionString = null; //if null then use zookeeper used by Storm - private int checkpointIntervalInSeconds = 10; - private int receiverCredits = 1024; - private int maxPendingMsgsPerPartition = 1024; - private long enqueueTimeFilter = 0; //timestamp in millisecond, 0 means disabling filter - private String connectionString; - private String topologyName; - private IEventDataScheme scheme = new EventDataScheme(); - private String consumerGroupName = null; //if null then use default consumer group - - //These are mandatory parameters - public EventHubSpoutConfig(String username, String password, String namespace, - String entityPath, int partitionCount) { - this.userName = username; - this.password = password; - this.connectionString = new ConnectionStringBuilder(username, password, - namespace).getConnectionString(); - this.namespace = namespace; - this.entityPath = entityPath; - this.partitionCount = partitionCount; - } - - //Keep this constructor for backward compatibility - public EventHubSpoutConfig(String username, String password, String namespace, - String entityPath, int partitionCount, String zkConnectionString) { - this(username, password, namespace, entityPath, partitionCount); - setZkConnectionString(zkConnectionString); - } - - //Keep this constructor for backward compatibility - public EventHubSpoutConfig(String username, String password, String namespace, - String entityPath, int partitionCount, String zkConnectionString, - int checkpointIntervalInSeconds, int receiverCredits) { - this(username, password, namespace, entityPath, partitionCount, - zkConnectionString); - setCheckpointIntervalInSeconds(checkpointIntervalInSeconds); - setReceiverCredits(receiverCredits); - } - - //Keep this constructor for backward compatibility - public EventHubSpoutConfig(String username, String password, String namespace, - String entityPath, int partitionCount, String zkConnectionString, - int checkpointIntervalInSeconds, int receiverCredits, int maxPendingMsgsPerPartition, long enqueueTimeFilter) { - - this(username, password, namespace, entityPath, partitionCount, - zkConnectionString, checkpointIntervalInSeconds, receiverCredits); - setMaxPendingMsgsPerPartition(maxPendingMsgsPerPartition); - setEnqueueTimeFilter(enqueueTimeFilter); - } - - public String getNamespace() { - return namespace; - } - - public String getEntityPath() { - return entityPath; - } - - public int getPartitionCount() { - return partitionCount; - } - - public String getZkConnectionString() { - return zkConnectionString; - } - - public void setZkConnectionString(String value) { - zkConnectionString = value; - } - - public int getCheckpointIntervalInSeconds() { - return checkpointIntervalInSeconds; - } - - public void setCheckpointIntervalInSeconds(int value) { - checkpointIntervalInSeconds = value; - } - - public int getReceiverCredits() { - return receiverCredits; - } - - public void setReceiverCredits(int value) { - receiverCredits = value; - } - - public int getMaxPendingMsgsPerPartition() { - return maxPendingMsgsPerPartition; - } - - public void setMaxPendingMsgsPerPartition(int value) { - maxPendingMsgsPerPartition = value; - } - - public long getEnqueueTimeFilter() { - return enqueueTimeFilter; - } - - public void setEnqueueTimeFilter(long value) { - enqueueTimeFilter = value; - } - - public String getTopologyName() { - return topologyName; - } - - public void setTopologyName(String value) { - topologyName = value; - } - - public IEventDataScheme getEventDataScheme() { - return scheme; - } - - public void setEventDataScheme(IEventDataScheme scheme) { - this.scheme = scheme; - } - - public String getConsumerGroupName() { - return consumerGroupName; - } - - public void setConsumerGroupName(String value) { - consumerGroupName = value; - } - - public List<String> getPartitionList() { - List<String> partitionList = new ArrayList<String>(); - - for (int i = 0; i < this.partitionCount; i++) { - partitionList.add(Integer.toString(i)); - } - - return partitionList; - } - - public String getConnectionString() { - return connectionString; - } - - public void setTargetAddress(String targetFqnAddress) { - this.connectionString = new ConnectionStringBuilder(userName, password, - namespace, targetFqnAddress).getConnectionString(); - } + private static final long serialVersionUID = 1L; + + public static final String EH_SERVICE_FQDN_SUFFIX = "servicebus.windows.net"; + private final String userName; + private final String password; + private final String namespace; + private final String entityPath; + private final int partitionCount; + + private String zkConnectionString = null; // if null then use zookeeper used + // by Storm + private int checkpointIntervalInSeconds = 10; + private int receiverCredits = 1024; + private int maxPendingMsgsPerPartition = 1024; + private long enqueueTimeFilter = 0; // timestamp in millisecond, 0 means + // disabling filter + private String connectionString; + private String topologyName; + private IEventDataScheme scheme = new StringEventDataScheme(); + private String consumerGroupName = null; // if null then use default + // consumer group + + // These are mandatory parameters + public EventHubSpoutConfig(String username, String password, + String namespace, String entityPath, int partitionCount) { + this.userName = username; + this.password = password; + this.connectionString = new ConnectionStringBuilder(username, password, + namespace).getConnectionString(); + this.namespace = namespace; + this.entityPath = entityPath; + this.partitionCount = partitionCount; + } + + // Keep this constructor for backward compatibility + public EventHubSpoutConfig(String username, String password, + String namespace, String entityPath, int partitionCount, + String zkConnectionString) { + this(username, password, namespace, entityPath, partitionCount); + setZkConnectionString(zkConnectionString); + } + + // Keep this constructor for backward compatibility + public EventHubSpoutConfig(String username, String password, + String namespace, String entityPath, int partitionCount, + String zkConnectionString, int checkpointIntervalInSeconds, + int receiverCredits) { + this(username, password, namespace, entityPath, partitionCount, + zkConnectionString); + setCheckpointIntervalInSeconds(checkpointIntervalInSeconds); + setReceiverCredits(receiverCredits); + } + + public EventHubSpoutConfig(String username, String password, + String namespace, String entityPath, int partitionCount, + String zkConnectionString, int checkpointIntervalInSeconds, + int receiverCredits, long enqueueTimeFilter) { + this(username, password, namespace, entityPath, partitionCount, + zkConnectionString, checkpointIntervalInSeconds, + receiverCredits); + setEnqueueTimeFilter(enqueueTimeFilter); + } + + // Keep this constructor for backward compatibility + public EventHubSpoutConfig(String username, String password, + String namespace, String entityPath, int partitionCount, + String zkConnectionString, int checkpointIntervalInSeconds, + int receiverCredits, int maxPendingMsgsPerPartition, + long enqueueTimeFilter) { + + this(username, password, namespace, entityPath, partitionCount, + zkConnectionString, checkpointIntervalInSeconds, + receiverCredits); + setMaxPendingMsgsPerPartition(maxPendingMsgsPerPartition); + setEnqueueTimeFilter(enqueueTimeFilter); + } + + public String getNamespace() { + return namespace; + } + + public String getEntityPath() { + return entityPath; + } + + public int getPartitionCount() { + return partitionCount; + } + + public String getZkConnectionString() { + return zkConnectionString; + } + + public void setZkConnectionString(String value) { + zkConnectionString = value; + } + + public EventHubSpoutConfig withZkConnectionString(String value) { + setZkConnectionString(value); + return this; + } + + public int getCheckpointIntervalInSeconds() { + return checkpointIntervalInSeconds; + } + + public void setCheckpointIntervalInSeconds(int value) { + checkpointIntervalInSeconds = value; + } + + public EventHubSpoutConfig withCheckpointIntervalInSeconds(int value) { + setCheckpointIntervalInSeconds(value); + return this; + } + + public int getReceiverCredits() { + return receiverCredits; + } + + public void setReceiverCredits(int value) { + receiverCredits = value; + } + + public EventHubSpoutConfig withReceiverCredits(int value) { + setReceiverCredits(value); + return this; + } + + public int getMaxPendingMsgsPerPartition() { + return maxPendingMsgsPerPartition; + } + + public void setMaxPendingMsgsPerPartition(int value) { + maxPendingMsgsPerPartition = value; + } + + public EventHubSpoutConfig withMaxPendingMsgsPerPartition(int value) { + setMaxPendingMsgsPerPartition(value); + return this; + } + + public long getEnqueueTimeFilter() { + return enqueueTimeFilter; + } + + public void setEnqueueTimeFilter(long value) { + enqueueTimeFilter = value; + } + + public EventHubSpoutConfig withEnqueueTimeFilter(long value) { + setEnqueueTimeFilter(value); + return this; + } + + public String getTopologyName() { + return topologyName; + } + + public void setTopologyName(String value) { + topologyName = value; + } + + public EventHubSpoutConfig withTopologyName(String value) { + setTopologyName(value); + return this; + } + + public IEventDataScheme getEventDataScheme() { + return scheme; + } + + public void setEventDataScheme(IEventDataScheme scheme) { + this.scheme = scheme; + } + + public EventHubSpoutConfig withEventDataScheme(IEventDataScheme value) { + setEventDataScheme(value); + return this; + } + + public String getConsumerGroupName() { + return consumerGroupName; + } + + public void setConsumerGroupName(String value) { + consumerGroupName = value; + } + + public EventHubSpoutConfig withConsumerGroupName(String value) { + setConsumerGroupName(value); + return this; + } + + public List<String> getPartitionList() { + List<String> partitionList = new ArrayList<String>(); + + for (int i = 0; i < this.partitionCount; i++) { + partitionList.add(Integer.toString(i)); + } + + return partitionList; + } + + public String getConnectionString() { + return connectionString; + } + + public void setTargetAddress(String targetFqnAddress) { + this.connectionString = new ConnectionStringBuilder(userName, password, + namespace, targetFqnAddress).getConnectionString(); + } + + public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) { + setTargetAddress(targetFqnAddress); + return this; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java index bd655d6..b238391 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java @@ -22,4 +22,5 @@ public class FieldConstants { public static final String PartitionKey = "partitionKey"; public static final String Offset = "offset"; public static final String Message = "message"; + public static final String META_DATA = "metadata"; } http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java index b7e03b4..b8101b9 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java @@ -24,7 +24,20 @@ import org.apache.qpid.amqp_1_0.client.Message; public interface IEventDataScheme extends Serializable { + /** + * Deserialize an AMQP Message into a Tuple. + * + * @see #getOutputFields() for the list of fields the tuple will contain. + * + * @param message The Message to Deserialize. + * @return A tuple containing the deserialized fields of the message. + */ List<Object> deserialize(Message message); + /** + * Retrieve the Fields that are present on tuples created by this object. + * + * @return The Fields that are present on tuples created by this object. + */ Fields getOutputFields(); } http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java new file mode 100644 index 0000000..0c6f8b6 --- /dev/null +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java @@ -0,0 +1,69 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.storm.eventhubs.spout; + +import org.apache.storm.tuple.Fields; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.amqp_1_0.client.Message; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.storm.tuple.Fields; + +/** + * An Event Data Scheme which deserializes message payload into the Strings. + * No encoding is assumed. The receiver will need to handle parsing of the + * string data in appropriate encoding. + * + * Note: Unlike other schemes provided, this scheme does not include any + * metadata. + * + * For metadata please refer to {@link BinaryEventDataScheme}, {@link EventDataScheme} + */ +public class StringEventDataScheme implements IEventDataScheme { + + private static final long serialVersionUID = 1L; + + @Override + public List<Object> deserialize(Message message) { + final List<Object> fieldContents = new ArrayList<Object>(); + + for (Section section : message.getPayload()) { + if (section instanceof Data) { + Data data = (Data) section; + fieldContents.add(new String(data.getValue().getArray())); + } else if (section instanceof AmqpValue) { + AmqpValue amqpValue = (AmqpValue) section; + fieldContents.add(amqpValue.getValue().toString()); + } + } + + return fieldContents; + } + + @Override + public Fields getOutputFields() { + return new Fields(FieldConstants.Message); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/c8f7a531/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5c47f75..c0985a4 100644 --- a/pom.xml +++ b/pom.xml @@ -285,6 +285,8 @@ <aetherVersion>1.0.0.v20140518</aetherVersion> <mavenVersion>3.1.0</mavenVersion> <wagonVersion>1.0</wagonVersion> + <qpid.version>0.32</qpid.version> + <eventhubs.client.version>1.0.1</eventhubs.client.version> </properties> <modules>