Fixing stylecheck problems with storm-eventhubs
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/18723171 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/18723171 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/18723171 Branch: refs/heads/master Commit: 18723171612bdfe818929297378433a3c069e4e7 Parents: fc1cf09 Author: Kishor Patil <[email protected]> Authored: Sun Apr 22 21:35:57 2018 -0400 Committer: Kishor Patil <[email protected]> Committed: Mon Apr 23 00:22:17 2018 -0400 ---------------------------------------------------------------------- external/storm-eventhubs/pom.xml | 2 +- .../eventhubs/bolt/DefaultEventDataFormat.java | 38 +- .../storm/eventhubs/bolt/EventHubBolt.java | 208 +++++---- .../eventhubs/bolt/EventHubBoltConfig.java | 147 +++--- .../storm/eventhubs/bolt/IEventDataFormat.java | 3 +- .../eventhubs/spout/BinaryEventDataScheme.java | 78 ++-- .../storm/eventhubs/spout/EventDataScheme.java | 92 ++-- .../storm/eventhubs/spout/EventDataWrap.java | 51 ++- .../storm/eventhubs/spout/EventHubFilter.java | 11 +- .../eventhubs/spout/EventHubReceiverImpl.java | 237 +++++----- .../storm/eventhubs/spout/EventHubSpout.java | 420 ++++++++--------- .../eventhubs/spout/EventHubSpoutConfig.java | 451 +++++++++---------- .../storm/eventhubs/spout/FieldConstants.java | 13 +- .../storm/eventhubs/spout/IEventDataScheme.java | 34 +- .../eventhubs/spout/IEventHubReceiver.java | 11 +- .../spout/IEventHubReceiverFactory.java | 27 +- .../eventhubs/spout/IPartitionCoordinator.java | 5 +- .../eventhubs/spout/IPartitionManager.java | 17 +- .../spout/IPartitionManagerFactory.java | 33 +- .../storm/eventhubs/spout/IStateStore.java | 9 +- .../apache/storm/eventhubs/spout/MessageId.java | 69 +-- .../storm/eventhubs/spout/PartitionManager.java | 126 +++--- .../eventhubs/spout/SimplePartitionManager.java | 200 ++++---- .../spout/StaticPartitionCoordinator.java | 92 ++-- .../eventhubs/spout/StringEventDataScheme.java | 78 ++-- .../eventhubs/spout/ZookeeperStateStore.java | 107 ++--- .../storm/eventhubs/trident/Coordinator.java | 65 ++- .../trident/ITridentPartitionManager.java | 26 +- .../ITridentPartitionManagerFactory.java | 4 +- .../trident/OpaqueTridentEventHubEmitter.java | 69 +-- .../trident/OpaqueTridentEventHubSpout.java | 65 ++- .../storm/eventhubs/trident/Partition.java | 23 +- .../storm/eventhubs/trident/Partitions.java | 28 +- .../TransactionalTridentEventHubEmitter.java | 240 +++++----- .../TransactionalTridentEventHubSpout.java | 64 ++- .../trident/TridentPartitionManager.java | 117 ++--- .../eventhubs/samples/AtMostOnceEventCount.java | 44 +- .../storm/eventhubs/samples/EventCount.java | 208 +++++---- .../storm/eventhubs/samples/EventHubLoop.java | 44 +- .../samples/OpaqueTridentEventCount.java | 45 +- .../samples/TransactionalTridentEventCount.java | 86 ++-- .../eventhubs/samples/bolt/GlobalCountBolt.java | 95 ++-- .../samples/bolt/PartialCountBolt.java | 59 ++- .../eventhubs/spout/EventHubReceiverMock.java | 97 ++-- .../spout/EventHubSpoutCallerMock.java | 130 +++--- .../spout/PartitionManagerCallerMock.java | 134 +++--- .../spout/SpoutOutputCollectorMock.java | 76 ++-- .../storm/eventhubs/spout/StateStoreMock.java | 44 +- .../storm/eventhubs/spout/TestEventData.java | 29 +- .../eventhubs/spout/TestEventHubSpout.java | 82 ++-- .../eventhubs/spout/TestPartitionManager.java | 178 ++++---- .../TestTransactionalTridentEmitter.java | 123 +++-- .../eventhubs/trident/TridentCollectorMock.java | 58 +-- 53 files changed, 2386 insertions(+), 2406 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml index 79b16a9..0c7d3a8 100755 --- a/external/storm-eventhubs/pom.xml +++ b/external/storm-eventhubs/pom.xml @@ -53,7 +53,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>1765</maxAllowedViolations> + <maxAllowedViolations>45</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java index d6e1dbc..094ab9a 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/DefaultEventDataFormat.java @@ -15,33 +15,33 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.bolt; import org.apache.storm.tuple.Tuple; /** - * A default implementation of IEventDataFormat that converts the tuple - * into a delimited string. + * A default implementation of IEventDataFormat that converts the tuple into a delimited string. */ public class DefaultEventDataFormat implements IEventDataFormat { - private static final long serialVersionUID = 1L; - private String delimiter = ","; - - public DefaultEventDataFormat withFieldDelimiter(String delimiter) { - this.delimiter = delimiter; - return this; - } + private static final long serialVersionUID = 1L; + private String delimiter = ","; + + public DefaultEventDataFormat withFieldDelimiter(String delimiter) { + this.delimiter = delimiter; + return this; + } - @Override - public byte[] serialize(Tuple tuple) { - StringBuilder sb = new StringBuilder(); - for(Object obj : tuple.getValues()) { - if(sb.length() != 0) { - sb.append(delimiter); - } - sb.append(obj.toString()); + @Override + public byte[] serialize(Tuple tuple) { + StringBuilder sb = new StringBuilder(); + for (Object obj : tuple.getValues()) { + if (sb.length() != 0) { + sb.append(delimiter); + } + sb.append(obj.toString()); + } + return sb.toString().getBytes(); } - return sb.toString().getBytes(); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java index 7d1aeab..85ffd03 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.bolt; @@ -22,6 +23,8 @@ import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.EventHubClient; import com.microsoft.azure.eventhubs.PartitionSender; import com.microsoft.azure.servicebus.ServiceBusException; +import java.util.Map; +import java.util.concurrent.ExecutionException; import org.apache.storm.eventhubs.spout.EventHubException; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -31,116 +34,107 @@ import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.concurrent.ExecutionException; - -import java.util.Map; -import java.util.concurrent.ExecutionException; - /** * A bolt that writes event message to EventHub. */ public class EventHubBolt extends BaseRichBolt { - private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory - .getLogger(EventHubBolt.class); - - protected OutputCollector collector; - protected PartitionSender sender; - protected EventHubClient ehClient; - protected EventHubBoltConfig boltConfig; - - public EventHubBolt(String connectionString, String entityPath) { - boltConfig = new EventHubBoltConfig(connectionString, entityPath); - } - - public EventHubBolt(String userName, String password, String namespace, - String entityPath, boolean partitionMode) { - boltConfig = new EventHubBoltConfig(userName, password, namespace, - entityPath, partitionMode); - } - - public EventHubBolt(EventHubBoltConfig config) { - boltConfig = config; - } - - @Override - public void prepare(Map<String, Object> config, TopologyContext context, - OutputCollector collector) { - this.collector = collector; - String myPartitionId = null; - if (boltConfig.getPartitionMode()) { - // We can use the task index (starting from 0) as the partition ID - myPartitionId = "" + context.getThisTaskIndex(); - } - logger.info("creating sender: " + boltConfig.getConnectionString() - + ", " + boltConfig.getEntityPath() + ", " + myPartitionId); - try { - ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString()); - if (boltConfig.getPartitionMode()) { - sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex())); - } - } catch (Exception ex) { - collector.reportError(ex); - throw new RuntimeException(ex); - } - - } - - @Override - public void execute(Tuple tuple) { - try { - EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple)); - if (boltConfig.getPartitionMode() && sender!=null) { - sender.sendSync(sendEvent); - } - else if (boltConfig.getPartitionMode() && sender==null) { - throw new EventHubException("Sender is null"); - } - else if (!boltConfig.getPartitionMode() && ehClient!=null) { - ehClient.sendSync(sendEvent); - } - else if (!boltConfig.getPartitionMode() && ehClient==null) { - throw new EventHubException("ehclient is null"); - } - collector.ack(tuple); - } catch (EventHubException ex ) { - collector.reportError(ex); - collector.fail(tuple); - } catch (ServiceBusException e) { - collector.reportError(e); - collector.fail(tuple); - } - } - - @Override - public void cleanup() { - if(sender != null) { - try { - sender.close().whenComplete((voidargs,error)->{ - try{ - if(error!=null){ - logger.error("Exception during sender cleanup phase"+error.toString()); - } - ehClient.closeSync(); - }catch (Exception e){ - logger.error("Exception during ehclient cleanup phase"+e.toString()); - } - }).get(); - } catch (InterruptedException e) { - logger.error("Exception occured during cleanup phase"+e.toString()); - } catch (ExecutionException e) { - logger.error("Exception occured during cleanup phase"+e.toString()); - } - logger.info("Eventhub Bolt cleaned up"); - sender = null; - ehClient = null; - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory + .getLogger(EventHubBolt.class); + + protected OutputCollector collector; + protected PartitionSender sender; + protected EventHubClient ehClient; + protected EventHubBoltConfig boltConfig; + + public EventHubBolt(String connectionString, String entityPath) { + boltConfig = new EventHubBoltConfig(connectionString, entityPath); + } + + public EventHubBolt(String userName, String password, String namespace, + String entityPath, boolean partitionMode) { + boltConfig = new EventHubBoltConfig(userName, password, namespace, + entityPath, partitionMode); + } + + public EventHubBolt(EventHubBoltConfig config) { + boltConfig = config; + } + + @Override + public void prepare(Map<String, Object> config, TopologyContext context, + OutputCollector collector) { + this.collector = collector; + String myPartitionId = null; + if (boltConfig.getPartitionMode()) { + // We can use the task index (starting from 0) as the partition ID + myPartitionId = "" + context.getThisTaskIndex(); + } + logger.info("creating sender: " + boltConfig.getConnectionString() + + ", " + boltConfig.getEntityPath() + ", " + myPartitionId); + try { + ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString()); + if (boltConfig.getPartitionMode()) { + sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex())); + } + } catch (Exception ex) { + collector.reportError(ex); + throw new RuntimeException(ex); + } + + } + + @Override + public void execute(Tuple tuple) { + try { + EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple)); + if (boltConfig.getPartitionMode() && sender != null) { + sender.sendSync(sendEvent); + } else if (boltConfig.getPartitionMode() && sender == null) { + throw new EventHubException("Sender is null"); + } else if (!boltConfig.getPartitionMode() && ehClient != null) { + ehClient.sendSync(sendEvent); + } else if (!boltConfig.getPartitionMode() && ehClient == null) { + throw new EventHubException("ehclient is null"); + } + collector.ack(tuple); + } catch (EventHubException ex) { + collector.reportError(ex); + collector.fail(tuple); + } catch (ServiceBusException e) { + collector.reportError(e); + collector.fail(tuple); + } + } + + @Override + public void cleanup() { + if (sender != null) { + try { + sender.close().whenComplete((voidargs, error) -> { + try { + if (error != null) { + logger.error("Exception during sender cleanup phase" + error.toString()); + } + ehClient.closeSync(); + } catch (Exception e) { + logger.error("Exception during ehclient cleanup phase" + e.toString()); + } + }).get(); + } catch (InterruptedException e) { + logger.error("Exception occured during cleanup phase" + e.toString()); + } catch (ExecutionException e) { + logger.error("Exception occured during cleanup phase" + e.toString()); + } + logger.info("Eventhub Bolt cleaned up"); + sender = null; + ehClient = null; + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java index f5e1458..41e39e4 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java @@ -15,97 +15,94 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.bolt; import com.microsoft.azure.servicebus.ConnectionStringBuilder; -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; - -import java.io.Serializable; - import java.io.Serializable; +import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; /* * EventHubs bolt configurations * * Partition mode: - * With partitionMode=true you need to create the same number of tasks as the number of + * With partitionMode=true you need to create the same number of tasks as the number of * EventHubs partitions, and each bolt task will only send data to one partition. * The partition ID is the task ID of the bolt. - * + * * Event format: * The formatter to convert tuple to bytes for EventHubs. * if null, the default format is common delimited tuple fields. */ public class EventHubBoltConfig implements Serializable { - private static final long serialVersionUID = 1L; - - private String connectionString; - private final String entityPath; - protected boolean partitionMode; - protected IEventDataFormat dataFormat; - - public EventHubBoltConfig(String connectionString, String entityPath) { - this(connectionString, entityPath, false, null); - } - - public EventHubBoltConfig(String connectionString, String entityPath, - boolean partitionMode) { - this(connectionString, entityPath, partitionMode, null); - } - - public EventHubBoltConfig(String userName, String password, String namespace, - String entityPath, boolean partitionMode) { - this(userName, password, namespace, - EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode); - } - - public EventHubBoltConfig(String connectionString, String entityPath, - boolean partitionMode, IEventDataFormat dataFormat) { - this.connectionString = connectionString; - this.entityPath = entityPath; - this.partitionMode = partitionMode; - this.dataFormat = dataFormat; - if(this.dataFormat == null) { - this.dataFormat = new DefaultEventDataFormat(); + private static final long serialVersionUID = 1L; + private final String entityPath; + protected boolean partitionMode; + protected IEventDataFormat dataFormat; + private String connectionString; + + public EventHubBoltConfig(String connectionString, String entityPath) { + this(connectionString, entityPath, false, null); + } + + public EventHubBoltConfig(String connectionString, String entityPath, + boolean partitionMode) { + this(connectionString, entityPath, partitionMode, null); + } + + public EventHubBoltConfig(String userName, String password, String namespace, + String entityPath, boolean partitionMode) { + this(userName, password, namespace, + EventHubSpoutConfig.EH_SERVICE_FQDN_SUFFIX, entityPath, partitionMode); + } + + public EventHubBoltConfig(String connectionString, String entityPath, + boolean partitionMode, IEventDataFormat dataFormat) { + this.connectionString = connectionString; + this.entityPath = entityPath; + this.partitionMode = partitionMode; + this.dataFormat = dataFormat; + if (this.dataFormat == null) { + this.dataFormat = new DefaultEventDataFormat(); + } } - } - - public EventHubBoltConfig(String userName, String password, String namespace, - String targetFqnAddress, String entityPath) { - this(userName, password, namespace, targetFqnAddress, entityPath, false, null); - } - - public EventHubBoltConfig(String userName, String password, String namespace, - String targetFqnAddress, String entityPath, boolean partitionMode) { - this(userName, password, namespace, targetFqnAddress, entityPath, partitionMode, null); - } - - public EventHubBoltConfig(String userName, String password, String namespace, - String targetFqnAddress, String entityPath, boolean partitionMode, - IEventDataFormat dataFormat) { - this.connectionString = new ConnectionStringBuilder(namespace,entityPath, - userName,password).toString(); - this.entityPath = entityPath; - this.partitionMode = partitionMode; - this.dataFormat = dataFormat; - if(this.dataFormat == null) { - this.dataFormat = new DefaultEventDataFormat(); + + public EventHubBoltConfig(String userName, String password, String namespace, + String targetFqnAddress, String entityPath) { + this(userName, password, namespace, targetFqnAddress, entityPath, false, null); + } + + public EventHubBoltConfig(String userName, String password, String namespace, + String targetFqnAddress, String entityPath, boolean partitionMode) { + this(userName, password, namespace, targetFqnAddress, entityPath, partitionMode, null); + } + + public EventHubBoltConfig(String userName, String password, String namespace, + String targetFqnAddress, String entityPath, boolean partitionMode, + IEventDataFormat dataFormat) { + this.connectionString = new ConnectionStringBuilder(namespace, entityPath, + userName, password).toString(); + this.entityPath = entityPath; + this.partitionMode = partitionMode; + this.dataFormat = dataFormat; + if (this.dataFormat == null) { + this.dataFormat = new DefaultEventDataFormat(); + } + } + + public String getConnectionString() { + return connectionString; + } + + public String getEntityPath() { + return entityPath; + } + + public boolean getPartitionMode() { + return partitionMode; + } + + public IEventDataFormat getEventDataFormat() { + return dataFormat; } - } - - public String getConnectionString() { - return connectionString; - } - - public String getEntityPath() { - return entityPath; - } - - public boolean getPartitionMode() { - return partitionMode; - } - - public IEventDataFormat getEventDataFormat() { - return dataFormat; - } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java index d2aacb7..ec09460 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/IEventDataFormat.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.bolt; import java.io.Serializable; @@ -24,5 +25,5 @@ import org.apache.storm.tuple.Tuple; * Serialize a tuple to a byte array to be sent to EventHubs */ public interface IEventDataFormat extends Serializable { - public byte[] serialize(Tuple tuple); + public byte[] serialize(Tuple tuple); } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java index bbd46ea..4c8e0a2 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java @@ -15,60 +15,58 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import com.microsoft.azure.eventhubs.EventData; -import org.apache.storm.tuple.Fields; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An Event Data Scheme which deserializes message payload into the raw bytes. * - * The resulting tuple would contain three items, the first being the message - * bytes, and the second a map of properties that include metadata, which can be - * used to determine who processes the message, and how it is processed.The third is - * the system properties which exposes information like enqueue-time, offset and - * sequence number + * The resulting tuple would contain three items, the first being the message bytes, and the second a map of properties that include + * metadata, which can be used to determine who processes the message, and how it is processed.The third is the system properties which + * exposes information like enqueue-time, offset and sequence number */ public class BinaryEventDataScheme implements IEventDataScheme { - private static final Logger logger = LoggerFactory.getLogger(BinaryEventDataScheme.class); - @Override - public List<Object> deserialize(EventData eventData){ - final List<Object> fieldContents = new ArrayList<Object>(); - byte [] messageData = null; - if (eventData.getBytes() != null) { - messageData = eventData.getBytes(); - } - else if (eventData.getObject()!=null) { - try { - messageData = SerializeDeserializeUtil.serialize(eventData.getObject()); - } catch (IOException e) { - logger.error("Failed to serialize EventData payload class" - + eventData.getObject().getClass()); - logger.error("Exception encountered while serializing EventData payload is" - + e.toString()); - throw new RuntimeException(e); - } - } - Map<String, Object> metaDataMap = eventData.getProperties(); - Map<String, Object> systemMetaDataMap = eventData.getSystemProperties(); - fieldContents.add(messageData); - fieldContents.add(metaDataMap); - fieldContents.add(systemMetaDataMap); - return fieldContents; - } + private static final Logger logger = LoggerFactory.getLogger(BinaryEventDataScheme.class); + + @Override + public List<Object> deserialize(EventData eventData) { + final List<Object> fieldContents = new ArrayList<Object>(); + byte[] messageData = null; + if (eventData.getBytes() != null) { + messageData = eventData.getBytes(); + } else if (eventData.getObject() != null) { + try { + messageData = SerializeDeserializeUtil.serialize(eventData.getObject()); + } catch (IOException e) { + logger.error("Failed to serialize EventData payload class" + + eventData.getObject().getClass()); + logger.error("Exception encountered while serializing EventData payload is" + + e.toString()); + throw new RuntimeException(e); + } + } + Map<String, Object> metaDataMap = eventData.getProperties(); + Map<String, Object> systemMetaDataMap = eventData.getSystemProperties(); + fieldContents.add(messageData); + fieldContents.add(metaDataMap); + fieldContents.add(systemMetaDataMap); + return fieldContents; + } - @Override - public Fields getOutputFields() { - return new Fields(FieldConstants.Message, FieldConstants.META_DATA, - FieldConstants.SYSTEM_META_DATA); - } + @Override + public Fields getOutputFields() { + return new Fields(FieldConstants.Message, FieldConstants.META_DATA, + FieldConstants.SYSTEM_META_DATA); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java index 9fbcecf..9bd0c22 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java @@ -15,64 +15,62 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import com.microsoft.azure.eventhubs.EventData; -import org.apache.storm.tuple.Fields; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * An Event Data Scheme which deserializes message payload into the Strings. No - * encoding is assumed. The receiver will need to handle parsing of the string - * data in appropriate encoding. + * An Event Data Scheme which deserializes message payload into the Strings. No encoding is assumed. The receiver will need to handle + * parsing of the string data in appropriate encoding. * - * The resulting tuple would contain two items: the the message string, and a - * map of properties that include metadata, which can be used to determine who - * processes the message, and how it is processed. - * - * For passing the raw bytes of a messsage to Bolts, refer to - * {@link BinaryEventDataScheme}. + * The resulting tuple would contain two items: the the message string, and a map of properties that include metadata, which can be used to + * determine who processes the message, and how it is processed. + * + * For passing the raw bytes of a messsage to Bolts, refer to {@link BinaryEventDataScheme}. */ public class EventDataScheme implements IEventDataScheme { - private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory.getLogger(EventDataScheme.class); - @Override - public List<Object> deserialize(EventData eventData) { - final List<Object> fieldContents = new ArrayList<Object>(); - String messageData = ""; - if (eventData.getBytes()!=null) { - messageData = new String(eventData.getBytes()); - } - /*Will only serialize AMQPValue type*/ - else if (eventData.getObject()!=null) { - try { - if (!(eventData.getObject() instanceof List)) { - messageData = eventData.getObject().toString(); - } else { - throw new RuntimeException("Cannot serialize the given AMQP type"); - } - } catch (RuntimeException e) { - logger.error("Failed to serialize EventData payload class" - + eventData.getObject().getClass()); - logger.error("Exception encountered while serializing EventData payload is" - + e.toString()); - throw e; - } - } - Map<String, Object> metaDataMap = eventData.getProperties(); - fieldContents.add(messageData); - fieldContents.add(metaDataMap); - return fieldContents; - } + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(EventDataScheme.class); + + @Override + public List<Object> deserialize(EventData eventData) { + final List<Object> fieldContents = new ArrayList<Object>(); + String messageData = ""; + if (eventData.getBytes() != null) { + messageData = new String(eventData.getBytes()); + } + /*Will only serialize AMQPValue type*/ + else if (eventData.getObject() != null) { + try { + if (!(eventData.getObject() instanceof List)) { + messageData = eventData.getObject().toString(); + } else { + throw new RuntimeException("Cannot serialize the given AMQP type"); + } + } catch (RuntimeException e) { + logger.error("Failed to serialize EventData payload class" + + eventData.getObject().getClass()); + logger.error("Exception encountered while serializing EventData payload is" + + e.toString()); + throw e; + } + } + Map<String, Object> metaDataMap = eventData.getProperties(); + fieldContents.add(messageData); + fieldContents.add(metaDataMap); + return fieldContents; + } - @Override - public Fields getOutputFields() { - return new Fields(FieldConstants.Message, FieldConstants.META_DATA); - } + @Override + public Fields getOutputFields() { + return new Fields(FieldConstants.Message, FieldConstants.META_DATA); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java index 5eeb4d2..fc23c05 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java @@ -15,34 +15,35 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import com.microsoft.azure.eventhubs.EventData; public class EventDataWrap implements Comparable<EventDataWrap> { - private final EventData eventData; - private final MessageId messageId; - - public EventDataWrap(EventData eventdata, MessageId messageId) { - this.eventData = eventdata; - this.messageId = messageId; - } - - public static EventDataWrap create(EventData eventData, MessageId messageId) { - return new EventDataWrap(eventData, messageId); - } - - public EventData getEventData() { - return this.eventData; - } - - public MessageId getMessageId() { - return this.messageId; - } - - @Override - public int compareTo(EventDataWrap ed) { - return messageId.getSequenceNumber(). - compareTo(ed.getMessageId().getSequenceNumber()); - } + private final EventData eventData; + private final MessageId messageId; + + public EventDataWrap(EventData eventdata, MessageId messageId) { + this.eventData = eventdata; + this.messageId = messageId; + } + + public static EventDataWrap create(EventData eventData, MessageId messageId) { + return new EventDataWrap(eventData, messageId); + } + + public EventData getEventData() { + return this.eventData; + } + + public MessageId getMessageId() { + return this.messageId; + } + + @Override + public int compareTo(EventDataWrap ed) { + return messageId.getSequenceNumber(). + compareTo(ed.getMessageId().getSequenceNumber()); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java index a375380..509191d 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubFilter.java @@ -15,32 +15,33 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import java.time.Instant; -public class EventHubFilter implements IEventFilter{ +public class EventHubFilter implements IEventFilter { String offset = null; Instant time = null; - public EventHubFilter(String offset){ + public EventHubFilter(String offset) { this.offset = offset; this.time = null; } - public EventHubFilter(Instant time){ + public EventHubFilter(Instant time) { this.time = time; this.offset = null; } @Override - public String getOffset(){ + public String getOffset() { return offset; } @Override - public Instant getTime(){ + public Instant getTime() { return time; } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java index 459b9bc..83dc850 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java @@ -15,152 +15,149 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.EventHubClient; import com.microsoft.azure.eventhubs.PartitionReceiver; import com.microsoft.azure.servicebus.ServiceBusException; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; import org.apache.storm.metric.api.CountMetric; import org.apache.storm.metric.api.MeanReducer; import org.apache.storm.metric.api.ReducedMetric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; - public class EventHubReceiverImpl implements IEventHubReceiver { - private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class); + private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class); - private final String connectionString; - private final String entityName; - private final String partitionId; - private final String consumerGroupName; + private final String connectionString; + private final String entityName; + private final String partitionId; + private final String consumerGroupName; - private PartitionReceiver receiver; - private EventHubClient ehClient; - private ReducedMetric receiveApiLatencyMean; - private CountMetric receiveApiCallCount; - private CountMetric receiveMessageCount; + private PartitionReceiver receiver; + private EventHubClient ehClient; + private ReducedMetric receiveApiLatencyMean; + private CountMetric receiveApiCallCount; + private CountMetric receiveMessageCount; - public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) { - this.connectionString = config.getConnectionString(); - this.entityName = config.getEntityPath(); - this.partitionId = partitionId; - this.consumerGroupName = config.getConsumerGroupName(); - receiveApiLatencyMean = new ReducedMetric(new MeanReducer()); - receiveApiCallCount = new CountMetric(); - receiveMessageCount = new CountMetric(); - } + public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) { + this.connectionString = config.getConnectionString(); + this.entityName = config.getEntityPath(); + this.partitionId = partitionId; + this.consumerGroupName = config.getConsumerGroupName(); + receiveApiLatencyMean = new ReducedMetric(new MeanReducer()); + receiveApiCallCount = new CountMetric(); + receiveMessageCount = new CountMetric(); + } - @Override - public void open(IEventFilter filter) throws EventHubException { - logger.info("creating eventhub receiver: partitionId=" + partitionId + - ", filter=" + filter.getOffset() != null ? - filter.getOffset() : Long.toString(filter.getTime().toEpochMilli())); - long start = System.currentTimeMillis(); - try { - ehClient = EventHubClient.createFromConnectionStringSync(connectionString); + @Override + public void open(IEventFilter filter) throws EventHubException { + logger.info("creating eventhub receiver: partitionId=" + partitionId + + ", filter=" + filter.getOffset() != null ? + filter.getOffset() : Long.toString(filter.getTime().toEpochMilli())); + long start = System.currentTimeMillis(); + try { + ehClient = EventHubClient.createFromConnectionStringSync(connectionString); - if (filter.getOffset()!=null) { - receiver = ehClient.createEpochReceiverSync( - consumerGroupName, - partitionId, - filter.getOffset(), - false, - 1); - } - else if (filter.getTime()!=null) { - receiver = ehClient.createEpochReceiverSync( - consumerGroupName, - partitionId, - filter.getTime(), - 1); - } - else{ - throw new RuntimeException("Eventhub receiver must have " + - "an offset or time to be created"); - } - } catch (IOException e) { - logger.error("Exception in creating ehclient"+ e.toString()); - throw new EventHubException(e); - } - catch (ServiceBusException e) { - logger.error("Exception in creating Receiver"+e.toString()); - throw new EventHubException(e); + if (filter.getOffset() != null) { + receiver = ehClient.createEpochReceiverSync( + consumerGroupName, + partitionId, + filter.getOffset(), + false, + 1); + } else if (filter.getTime() != null) { + receiver = ehClient.createEpochReceiverSync( + consumerGroupName, + partitionId, + filter.getTime(), + 1); + } else { + throw new RuntimeException("Eventhub receiver must have " + + "an offset or time to be created"); + } + } catch (IOException e) { + logger.error("Exception in creating ehclient" + e.toString()); + throw new EventHubException(e); + } catch (ServiceBusException e) { + logger.error("Exception in creating Receiver" + e.toString()); + throw new EventHubException(e); + } + long end = System.currentTimeMillis(); + logger.info("created eventhub receiver, time taken(ms): " + (end - start)); } - long end = System.currentTimeMillis(); - logger.info("created eventhub receiver, time taken(ms): " + (end-start)); - } - @Override - public void close() { - if(receiver != null) { - try { - receiver.close().whenComplete((voidargs,error)->{ - try { - if (error!=null) { - logger.error("Exception during receiver close phase"+error.toString()); + @Override + public void close() { + if (receiver != null) { + try { + receiver.close().whenComplete((voidargs, error) -> { + try { + if (error != null) { + logger.error("Exception during receiver close phase" + error.toString()); + } + ehClient.closeSync(); + } catch (Exception e) { + logger.error("Exception during ehclient close phase" + e.toString()); + } + }).get(); + } catch (InterruptedException e) { + logger.error("Exception occured during close phase" + e.toString()); + } catch (ExecutionException e) { + logger.error("Exception occured during close phase" + e.toString()); } - ehClient.closeSync(); - } catch (Exception e) { - logger.error("Exception during ehclient close phase"+e.toString()); - } - }).get(); - } catch (InterruptedException e) { - logger.error("Exception occured during close phase"+e.toString()); - } catch (ExecutionException e) { - logger.error("Exception occured during close phase"+e.toString()); - } - logger.info("closed eventhub receiver: partitionId=" + partitionId ); - receiver = null; - ehClient = null; + logger.info("closed eventhub receiver: partitionId=" + partitionId); + receiver = null; + ehClient = null; + } } - } - - @Override - public boolean isOpen() { - return (receiver != null); - } - @Override - public EventDataWrap receive() { - long start = System.currentTimeMillis(); - Iterable<EventData> receivedEvents=null; - /*Get one message at a time for backward compatibility behaviour*/ - try { - receivedEvents = receiver.receiveSync(1); - } catch (ServiceBusException e) { - logger.error("Exception occured during receive"+e.toString()); - return null; + @Override + public boolean isOpen() { + return (receiver != null); } - long end = System.currentTimeMillis(); - long millis = (end - start); - receiveApiLatencyMean.update(millis); - receiveApiCallCount.incr(); - if (receivedEvents == null || receivedEvents.spliterator().getExactSizeIfKnown() == 0) { - return null; - } - receiveMessageCount.incr(); - EventData receivedEvent = receivedEvents.iterator().next(); - MessageId messageId = new MessageId(partitionId, - receivedEvent.getSystemProperties().getOffset(), - receivedEvent.getSystemProperties().getSequenceNumber()); + @Override + public EventDataWrap receive() { + long start = System.currentTimeMillis(); + Iterable<EventData> receivedEvents = null; + /*Get one message at a time for backward compatibility behaviour*/ + try { + receivedEvents = receiver.receiveSync(1); + } catch (ServiceBusException e) { + logger.error("Exception occured during receive" + e.toString()); + return null; + } + long end = System.currentTimeMillis(); + long millis = (end - start); + receiveApiLatencyMean.update(millis); + receiveApiCallCount.incr(); + + if (receivedEvents == null || receivedEvents.spliterator().getExactSizeIfKnown() == 0) { + return null; + } + receiveMessageCount.incr(); + EventData receivedEvent = receivedEvents.iterator().next(); + MessageId messageId = new MessageId(partitionId, + receivedEvent.getSystemProperties().getOffset(), + receivedEvent.getSystemProperties().getSequenceNumber()); - return EventDataWrap.create(receivedEvent,messageId); - } + return EventDataWrap.create(receivedEvent, messageId); + } - @Override - public Map<String, Object> getMetricsData() { - Map<String, Object> ret = new HashMap<>(); - ret.put(partitionId + "/receiveApiLatencyMean", receiveApiLatencyMean.getValueAndReset()); - ret.put(partitionId + "/receiveApiCallCount", receiveApiCallCount.getValueAndReset()); - ret.put(partitionId + "/receiveMessageCount", receiveMessageCount.getValueAndReset()); - return ret; - } + @Override + public Map<String, Object> getMetricsData() { + Map<String, Object> ret = new HashMap<>(); + ret.put(partitionId + "/receiveApiLatencyMean", receiveApiLatencyMean.getValueAndReset()); + ret.put(partitionId + "/receiveApiCallCount", receiveApiCallCount.getValueAndReset()); + ret.put(partitionId + "/receiveMessageCount", receiveMessageCount.getValueAndReset()); + return ret; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java index d8c3d09..f8e144e 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java @@ -15,9 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import com.google.common.base.Strings; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; import org.apache.storm.Config; import org.apache.storm.metric.api.IMetric; import org.apache.storm.spout.SpoutOutputCollector; @@ -27,236 +32,233 @@ import org.apache.storm.topology.base.BaseRichSpout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; - public class EventHubSpout extends BaseRichSpout { - private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class); - - private final UUID instanceId; - private final EventHubSpoutConfig eventHubConfig; - private final IEventDataScheme scheme; - private final int checkpointIntervalInSeconds; - - private IStateStore stateStore; - private IPartitionCoordinator partitionCoordinator; - private IPartitionManagerFactory pmFactory; - private IEventHubReceiverFactory recvFactory; - private SpoutOutputCollector collector; - private long lastCheckpointTime; - private int currentPartitionIndex = -1; - - public EventHubSpout(String username, String password, String namespace, - String entityPath, int partitionCount) { - this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount)); - } - - public EventHubSpout(EventHubSpoutConfig spoutConfig) { - this(spoutConfig, null, null, null); - } - - public EventHubSpout(EventHubSpoutConfig spoutConfig, - IStateStore store, - IPartitionManagerFactory pmFactory, - IEventHubReceiverFactory recvFactory) { - this.eventHubConfig = spoutConfig; - this.scheme = spoutConfig.getEventDataScheme(); - this.instanceId = UUID.randomUUID(); - this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds(); - this.lastCheckpointTime = System.currentTimeMillis(); - stateStore = store; - this.pmFactory = pmFactory; - if(this.pmFactory == null) { - this.pmFactory = new IPartitionManagerFactory() { - @Override - public IPartitionManager create(EventHubSpoutConfig spoutConfig, - String partitionId, IStateStore stateStore, - IEventHubReceiver receiver) { - return new PartitionManager(spoutConfig, partitionId, - stateStore, receiver); + private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class); + + private final UUID instanceId; + private final EventHubSpoutConfig eventHubConfig; + private final IEventDataScheme scheme; + private final int checkpointIntervalInSeconds; + + private IStateStore stateStore; + private IPartitionCoordinator partitionCoordinator; + private IPartitionManagerFactory pmFactory; + private IEventHubReceiverFactory recvFactory; + private SpoutOutputCollector collector; + private long lastCheckpointTime; + private int currentPartitionIndex = -1; + + public EventHubSpout(String username, String password, String namespace, + String entityPath, int partitionCount) { + this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount)); + } + + public EventHubSpout(EventHubSpoutConfig spoutConfig) { + this(spoutConfig, null, null, null); + } + + public EventHubSpout(EventHubSpoutConfig spoutConfig, + IStateStore store, + IPartitionManagerFactory pmFactory, + IEventHubReceiverFactory recvFactory) { + this.eventHubConfig = spoutConfig; + this.scheme = spoutConfig.getEventDataScheme(); + this.instanceId = UUID.randomUUID(); + this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds(); + this.lastCheckpointTime = System.currentTimeMillis(); + stateStore = store; + this.pmFactory = pmFactory; + if (this.pmFactory == null) { + this.pmFactory = new IPartitionManagerFactory() { + @Override + public IPartitionManager create(EventHubSpoutConfig spoutConfig, + String partitionId, IStateStore stateStore, + IEventHubReceiver receiver) { + return new PartitionManager(spoutConfig, partitionId, + stateStore, receiver); + } + }; + } + this.recvFactory = recvFactory; + if (this.recvFactory == null) { + this.recvFactory = new IEventHubReceiverFactory() { + @Override + public IEventHubReceiver create(EventHubSpoutConfig spoutConfig, + String partitionId) { + return new EventHubReceiverImpl(spoutConfig, partitionId); + } + }; } - }; + } - this.recvFactory = recvFactory; - if(this.recvFactory == null) { - this.recvFactory = new IEventHubReceiverFactory() { - @Override - public IEventHubReceiver create(EventHubSpoutConfig spoutConfig, - String partitionId) { - return new EventHubReceiverImpl(spoutConfig, partitionId); + + /** + * This is a extracted method that is easy to test + * + * @param config + * @param totalTasks + * @param taskIndex + * @param collector + * @throws Exception + */ + public void preparePartitions(Map<String, Object> config, int totalTasks, int taskIndex, SpoutOutputCollector collector) throws + Exception { + this.collector = collector; + if (stateStore == null) { + String zkEndpointAddress = eventHubConfig.getZkConnectionString(); + if (zkEndpointAddress == null || zkEndpointAddress.length() == 0) { + //use storm's zookeeper servers if not specified. + @SuppressWarnings("unchecked") + List<String> zkServers = (List<String>) config.get(Config.STORM_ZOOKEEPER_SERVERS); + Integer zkPort = ((Number) config.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); + StringBuilder sb = new StringBuilder(); + for (String zk : zkServers) { + if (sb.length() > 0) { + sb.append(','); + } + sb.append(zk + ":" + zkPort); + } + zkEndpointAddress = sb.toString(); + } + stateStore = new ZookeeperStateStore(zkEndpointAddress, + Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()), + Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL).toString())); + } + stateStore.open(); + + partitionCoordinator = new StaticPartitionCoordinator( + eventHubConfig, taskIndex, totalTasks, stateStore, pmFactory, recvFactory); + + for (IPartitionManager partitionManager : + partitionCoordinator.getMyPartitionManagers()) { + partitionManager.open(); } - }; } - - } - - /** - * This is a extracted method that is easy to test - * @param config - * @param totalTasks - * @param taskIndex - * @param collector - * @throws Exception - */ - public void preparePartitions(Map<String, Object> config, int totalTasks, int taskIndex, SpoutOutputCollector collector) throws Exception { - this.collector = collector; - if(stateStore == null) { - String zkEndpointAddress = eventHubConfig.getZkConnectionString(); - if (zkEndpointAddress == null || zkEndpointAddress.length() == 0) { - //use storm's zookeeper servers if not specified. - @SuppressWarnings("unchecked") - List<String> zkServers = (List<String>) config.get(Config.STORM_ZOOKEEPER_SERVERS); - Integer zkPort = ((Number) config.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); - StringBuilder sb = new StringBuilder(); - for (String zk : zkServers) { - if (sb.length() > 0) { - sb.append(','); - } - sb.append(zk+":"+zkPort); + + @Override + public void open(Map<String, Object> config, TopologyContext context, SpoutOutputCollector collector) { + logger.info("begin:start open()"); + String topologyName = (String) config.get(Config.TOPOLOGY_NAME); + eventHubConfig.setTopologyName(topologyName); + + int totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); + int taskIndex = context.getThisTaskIndex(); + if (totalTasks > eventHubConfig.getPartitionCount()) { + throw new RuntimeException("total tasks of EventHubSpout is greater than partition count."); + } + + logger.info(String.format("topologyName: %s, totalTasks: %d, taskIndex: %d", topologyName, totalTasks, taskIndex)); + + try { + preparePartitions(config, totalTasks, taskIndex, collector); + } catch (Exception e) { + collector.reportError(e); + throw new RuntimeException(e); } - zkEndpointAddress = sb.toString(); - } - stateStore = new ZookeeperStateStore(zkEndpointAddress, - Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()), - Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL).toString())); + + //register metrics + context.registerMetric("EventHubReceiver", new IMetric() { + @Override + public Object getValueAndReset() { + Map<String, Object> concatMetricsDataMaps = new HashMap<>(); + for (IPartitionManager partitionManager : + partitionCoordinator.getMyPartitionManagers()) { + concatMetricsDataMaps.putAll(partitionManager.getMetricsData()); + } + return concatMetricsDataMaps; + } + }, Integer.parseInt(config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS).toString())); + logger.info("end open()"); } - stateStore.open(); - partitionCoordinator = new StaticPartitionCoordinator( - eventHubConfig, taskIndex, totalTasks, stateStore, pmFactory, recvFactory); + @Override + public void nextTuple() { + EventDataWrap eventDatawrap = null; + + List<IPartitionManager> partitionManagers = partitionCoordinator.getMyPartitionManagers(); + for (int i = 0; i < partitionManagers.size(); i++) { + currentPartitionIndex = (currentPartitionIndex + 1) % partitionManagers.size(); + IPartitionManager partitionManager = partitionManagers.get(currentPartitionIndex); + + if (partitionManager == null) { + throw new RuntimeException("partitionManager doesn't exist."); + } + + eventDatawrap = partitionManager.receive(); + + if (eventDatawrap != null) { + break; + } + } + + if (eventDatawrap != null) { + MessageId messageId = eventDatawrap.getMessageId(); + List<Object> tuples = scheme.deserialize(eventDatawrap.getEventData()); + if (tuples != null) { + collector.emit(tuples, messageId); + } + } + + checkpointIfNeeded(); - for (IPartitionManager partitionManager : - partitionCoordinator.getMyPartitionManagers()) { - partitionManager.open(); + // We don't need to sleep here because the IPartitionManager.receive() is + // a blocked call so it's fine to call this function in a tight loop. } - } - - @Override - public void open(Map<String, Object> config, TopologyContext context, SpoutOutputCollector collector) { - logger.info("begin:start open()"); - String topologyName = (String) config.get(Config.TOPOLOGY_NAME); - eventHubConfig.setTopologyName(topologyName); - - int totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); - int taskIndex = context.getThisTaskIndex(); - if (totalTasks > eventHubConfig.getPartitionCount()) { - throw new RuntimeException("total tasks of EventHubSpout is greater than partition count."); + + @Override + public void ack(Object msgId) { + MessageId messageId = (MessageId) msgId; + IPartitionManager partitionManager = partitionCoordinator.getPartitionManager(messageId.getPartitionId()); + String offset = messageId.getOffset(); + partitionManager.ack(offset); } - logger.info(String.format("topologyName: %s, totalTasks: %d, taskIndex: %d", topologyName, totalTasks, taskIndex)); + @Override + public void fail(Object msgId) { + MessageId messageId = (MessageId) msgId; + IPartitionManager partitionManager = partitionCoordinator.getPartitionManager(messageId.getPartitionId()); + String offset = messageId.getOffset(); + partitionManager.fail(offset); + } - try { - preparePartitions(config, totalTasks, taskIndex, collector); - } catch (Exception e) { - collector.reportError(e); - throw new RuntimeException(e); + @Override + public void deactivate() { + // let's checkpoint so that we can get the last checkpoint when restarting. + checkpoint(); } - - //register metrics - context.registerMetric("EventHubReceiver", new IMetric() { - @Override - public Object getValueAndReset() { - Map<String, Object> concatMetricsDataMaps = new HashMap<>(); - for (IPartitionManager partitionManager : + + @Override + public void close() { + for (IPartitionManager partitionManager : partitionCoordinator.getMyPartitionManagers()) { - concatMetricsDataMaps.putAll(partitionManager.getMetricsData()); - } - return concatMetricsDataMaps; - } - }, Integer.parseInt(config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS).toString())); - logger.info("end open()"); - } - - @Override - public void nextTuple() { - EventDataWrap eventDatawrap = null; - - List<IPartitionManager> partitionManagers = partitionCoordinator.getMyPartitionManagers(); - for (int i = 0; i < partitionManagers.size(); i++) { - currentPartitionIndex = (currentPartitionIndex + 1) % partitionManagers.size(); - IPartitionManager partitionManager = partitionManagers.get(currentPartitionIndex); - - if (partitionManager == null) { - throw new RuntimeException("partitionManager doesn't exist."); - } - - eventDatawrap = partitionManager.receive(); - - if (eventDatawrap != null) { - break; - } + partitionManager.close(); + } + stateStore.close(); } - if (eventDatawrap != null) { - MessageId messageId = eventDatawrap.getMessageId(); - List<Object> tuples = scheme.deserialize(eventDatawrap.getEventData()); - if (tuples != null) { - collector.emit(tuples, messageId); - } - } - - checkpointIfNeeded(); - - // We don't need to sleep here because the IPartitionManager.receive() is - // a blocked call so it's fine to call this function in a tight loop. - } - - @Override - public void ack(Object msgId) { - MessageId messageId = (MessageId) msgId; - IPartitionManager partitionManager = partitionCoordinator.getPartitionManager(messageId.getPartitionId()); - String offset = messageId.getOffset(); - partitionManager.ack(offset); - } - - @Override - public void fail(Object msgId) { - MessageId messageId = (MessageId) msgId; - IPartitionManager partitionManager = partitionCoordinator.getPartitionManager(messageId.getPartitionId()); - String offset = messageId.getOffset(); - partitionManager.fail(offset); - } - - @Override - public void deactivate() { - // let's checkpoint so that we can get the last checkpoint when restarting. - checkpoint(); - } - - @Override - public void close() { - for (IPartitionManager partitionManager : - partitionCoordinator.getMyPartitionManagers()) { - partitionManager.close(); - } - stateStore.close(); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - if (Strings.isNullOrEmpty(eventHubConfig.getOutputStreamId())) { - declarer.declare(scheme.getOutputFields()); - } else { - declarer.declareStream(eventHubConfig.getOutputStreamId(), scheme.getOutputFields()); + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + if (Strings.isNullOrEmpty(eventHubConfig.getOutputStreamId())) { + declarer.declare(scheme.getOutputFields()); + } else { + declarer.declareStream(eventHubConfig.getOutputStreamId(), scheme.getOutputFields()); + } } - } - private void checkpointIfNeeded() { - long nextCheckpointTime = lastCheckpointTime + checkpointIntervalInSeconds * 1000; - if (nextCheckpointTime < System.currentTimeMillis()) { + private void checkpointIfNeeded() { + long nextCheckpointTime = lastCheckpointTime + checkpointIntervalInSeconds * 1000; + if (nextCheckpointTime < System.currentTimeMillis()) { - checkpoint(); - lastCheckpointTime = System.currentTimeMillis(); + checkpoint(); + lastCheckpointTime = System.currentTimeMillis(); + } } - } - - private void checkpoint() { - for (IPartitionManager partitionManager : - partitionCoordinator.getMyPartitionManagers()) { - partitionManager.checkpoint(); + + private void checkpoint() { + for (IPartitionManager partitionManager : + partitionCoordinator.getMyPartitionManagers()) { + partitionManager.checkpoint(); + } } - } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java index 5556970..cd27b11 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java @@ -15,242 +15,241 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import com.microsoft.azure.eventhubs.EventHubClient; import com.microsoft.azure.servicebus.ConnectionStringBuilder; - import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class EventHubSpoutConfig implements Serializable { - private static final long serialVersionUID = 1L; - - public static final String EH_SERVICE_FQDN_SUFFIX = "servicebus.windows.net"; - private final String userName; - private final String password; - private final String namespace; - private final String entityPath; - private final int partitionCount; - - private String zkConnectionString = null; // if null then use zookeeper used - // by Storm - private int checkpointIntervalInSeconds = 10; - private int receiverCredits = 1024; - private int maxPendingMsgsPerPartition = 1024; - private long enqueueTimeFilter = 0; // timestamp in millisecond, 0 means - // disabling filter - private String connectionString; - private String topologyName; - private IEventDataScheme scheme = new StringEventDataScheme(); - private String consumerGroupName = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME; - private String outputStreamId; - - - // These are mandatory parameters - public EventHubSpoutConfig(String username, String password, - String namespace, String entityPath, int partitionCount) { - this.userName = username; - this.password = password; - this.connectionString = new ConnectionStringBuilder(namespace,entityPath, - username,password).toString(); - this.namespace = namespace; - this.entityPath = entityPath; - this.partitionCount = partitionCount; - } - - // Keep this constructor for backward compatibility - public EventHubSpoutConfig(String username, String password, - String namespace, String entityPath, int partitionCount, - String zkConnectionString) { - this(username, password, namespace, entityPath, partitionCount); - setZkConnectionString(zkConnectionString); - } - - // Keep this constructor for backward compatibility - public EventHubSpoutConfig(String username, String password, - String namespace, String entityPath, int partitionCount, - String zkConnectionString, int checkpointIntervalInSeconds, - int receiverCredits) { - this(username, password, namespace, entityPath, partitionCount, - zkConnectionString); - setCheckpointIntervalInSeconds(checkpointIntervalInSeconds); - setReceiverCredits(receiverCredits); - } - - public EventHubSpoutConfig(String username, String password, - String namespace, String entityPath, int partitionCount, - String zkConnectionString, int checkpointIntervalInSeconds, - int receiverCredits, long enqueueTimeFilter) { - this(username, password, namespace, entityPath, partitionCount, - zkConnectionString, checkpointIntervalInSeconds, - receiverCredits); - setEnqueueTimeFilter(enqueueTimeFilter); - } - - // Keep this constructor for backward compatibility - public EventHubSpoutConfig(String username, String password, - String namespace, String entityPath, int partitionCount, - String zkConnectionString, int checkpointIntervalInSeconds, - int receiverCredits, int maxPendingMsgsPerPartition, - long enqueueTimeFilter) { - - this(username, password, namespace, entityPath, partitionCount, - zkConnectionString, checkpointIntervalInSeconds, - receiverCredits); - setMaxPendingMsgsPerPartition(maxPendingMsgsPerPartition); - setEnqueueTimeFilter(enqueueTimeFilter); - } - - public String getNamespace() { - return namespace; - } - - public String getEntityPath() { - return entityPath; - } - - public int getPartitionCount() { - return partitionCount; - } - - public String getZkConnectionString() { - return zkConnectionString; - } - - public void setZkConnectionString(String value) { - zkConnectionString = value; - } - - public EventHubSpoutConfig withZkConnectionString(String value) { - setZkConnectionString(value); - return this; - } - - public int getCheckpointIntervalInSeconds() { - return checkpointIntervalInSeconds; - } - - public void setCheckpointIntervalInSeconds(int value) { - checkpointIntervalInSeconds = value; - } - - public EventHubSpoutConfig withCheckpointIntervalInSeconds(int value) { - setCheckpointIntervalInSeconds(value); - return this; - } - - public int getReceiverCredits() { - return receiverCredits; - } - - public void setReceiverCredits(int value) { - receiverCredits = value; - } - - public EventHubSpoutConfig withReceiverCredits(int value) { - setReceiverCredits(value); - return this; - } - - public int getMaxPendingMsgsPerPartition() { - return maxPendingMsgsPerPartition; - } - - public void setMaxPendingMsgsPerPartition(int value) { - maxPendingMsgsPerPartition = value; - } - - public EventHubSpoutConfig withMaxPendingMsgsPerPartition(int value) { - setMaxPendingMsgsPerPartition(value); - return this; - } - - public long getEnqueueTimeFilter() { - return enqueueTimeFilter; - } - - public void setEnqueueTimeFilter(long value) { - enqueueTimeFilter = value; - } - - public EventHubSpoutConfig withEnqueueTimeFilter(long value) { - setEnqueueTimeFilter(value); - return this; - } - - public String getTopologyName() { - return topologyName; - } - - public void setTopologyName(String value) { - topologyName = value; - } - - public EventHubSpoutConfig withTopologyName(String value) { - setTopologyName(value); - return this; - } - - public IEventDataScheme getEventDataScheme() { - return scheme; - } - - public void setEventDataScheme(IEventDataScheme scheme) { - this.scheme = scheme; - } - - public EventHubSpoutConfig withEventDataScheme(IEventDataScheme value) { - setEventDataScheme(value); - return this; - } - - public String getConsumerGroupName() { - return consumerGroupName; - } - - public void setConsumerGroupName(String value) { - consumerGroupName = value; - } - - public EventHubSpoutConfig withConsumerGroupName(String value) { - setConsumerGroupName(value); - return this; - } + public static final String EH_SERVICE_FQDN_SUFFIX = "servicebus.windows.net"; + private static final long serialVersionUID = 1L; + private final String userName; + private final String password; + private final String namespace; + private final String entityPath; + private final int partitionCount; + + private String zkConnectionString = null; // if null then use zookeeper used + // by Storm + private int checkpointIntervalInSeconds = 10; + private int receiverCredits = 1024; + private int maxPendingMsgsPerPartition = 1024; + private long enqueueTimeFilter = 0; // timestamp in millisecond, 0 means + // disabling filter + private String connectionString; + private String topologyName; + private IEventDataScheme scheme = new StringEventDataScheme(); + private String consumerGroupName = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME; + private String outputStreamId; + + + // These are mandatory parameters + public EventHubSpoutConfig(String username, String password, + String namespace, String entityPath, int partitionCount) { + this.userName = username; + this.password = password; + this.connectionString = new ConnectionStringBuilder(namespace, entityPath, + username, password).toString(); + this.namespace = namespace; + this.entityPath = entityPath; + this.partitionCount = partitionCount; + } + + // Keep this constructor for backward compatibility + public EventHubSpoutConfig(String username, String password, + String namespace, String entityPath, int partitionCount, + String zkConnectionString) { + this(username, password, namespace, entityPath, partitionCount); + setZkConnectionString(zkConnectionString); + } + + // Keep this constructor for backward compatibility + public EventHubSpoutConfig(String username, String password, + String namespace, String entityPath, int partitionCount, + String zkConnectionString, int checkpointIntervalInSeconds, + int receiverCredits) { + this(username, password, namespace, entityPath, partitionCount, + zkConnectionString); + setCheckpointIntervalInSeconds(checkpointIntervalInSeconds); + setReceiverCredits(receiverCredits); + } + + public EventHubSpoutConfig(String username, String password, + String namespace, String entityPath, int partitionCount, + String zkConnectionString, int checkpointIntervalInSeconds, + int receiverCredits, long enqueueTimeFilter) { + this(username, password, namespace, entityPath, partitionCount, + zkConnectionString, checkpointIntervalInSeconds, + receiverCredits); + setEnqueueTimeFilter(enqueueTimeFilter); + } + + // Keep this constructor for backward compatibility + public EventHubSpoutConfig(String username, String password, + String namespace, String entityPath, int partitionCount, + String zkConnectionString, int checkpointIntervalInSeconds, + int receiverCredits, int maxPendingMsgsPerPartition, + long enqueueTimeFilter) { + + this(username, password, namespace, entityPath, partitionCount, + zkConnectionString, checkpointIntervalInSeconds, + receiverCredits); + setMaxPendingMsgsPerPartition(maxPendingMsgsPerPartition); + setEnqueueTimeFilter(enqueueTimeFilter); + } + + public String getNamespace() { + return namespace; + } + + public String getEntityPath() { + return entityPath; + } + + public int getPartitionCount() { + return partitionCount; + } + + public String getZkConnectionString() { + return zkConnectionString; + } + + public void setZkConnectionString(String value) { + zkConnectionString = value; + } + + public EventHubSpoutConfig withZkConnectionString(String value) { + setZkConnectionString(value); + return this; + } + + public int getCheckpointIntervalInSeconds() { + return checkpointIntervalInSeconds; + } + + public void setCheckpointIntervalInSeconds(int value) { + checkpointIntervalInSeconds = value; + } + + public EventHubSpoutConfig withCheckpointIntervalInSeconds(int value) { + setCheckpointIntervalInSeconds(value); + return this; + } + + public int getReceiverCredits() { + return receiverCredits; + } + + public void setReceiverCredits(int value) { + receiverCredits = value; + } + + public EventHubSpoutConfig withReceiverCredits(int value) { + setReceiverCredits(value); + return this; + } + + public int getMaxPendingMsgsPerPartition() { + return maxPendingMsgsPerPartition; + } + + public void setMaxPendingMsgsPerPartition(int value) { + maxPendingMsgsPerPartition = value; + } + + public EventHubSpoutConfig withMaxPendingMsgsPerPartition(int value) { + setMaxPendingMsgsPerPartition(value); + return this; + } + + public long getEnqueueTimeFilter() { + return enqueueTimeFilter; + } + + public void setEnqueueTimeFilter(long value) { + enqueueTimeFilter = value; + } + + public EventHubSpoutConfig withEnqueueTimeFilter(long value) { + setEnqueueTimeFilter(value); + return this; + } + + public String getTopologyName() { + return topologyName; + } + + public void setTopologyName(String value) { + topologyName = value; + } + + public EventHubSpoutConfig withTopologyName(String value) { + setTopologyName(value); + return this; + } + + public IEventDataScheme getEventDataScheme() { + return scheme; + } + + public void setEventDataScheme(IEventDataScheme scheme) { + this.scheme = scheme; + } + + public EventHubSpoutConfig withEventDataScheme(IEventDataScheme value) { + setEventDataScheme(value); + return this; + } + + public String getConsumerGroupName() { + return consumerGroupName; + } + + public void setConsumerGroupName(String value) { + consumerGroupName = value; + } + + public EventHubSpoutConfig withConsumerGroupName(String value) { + setConsumerGroupName(value); + return this; + } - public List<String> getPartitionList() { - List<String> partitionList = new ArrayList<String>(); - - for (int i = 0; i < this.partitionCount; i++) { - partitionList.add(Integer.toString(i)); - } - - return partitionList; - } - - public String getConnectionString() { - return connectionString; - } - - /*Keeping it for backward compatibility*/ - public void setTargetAddress(String targetFqnAddress) { - } - - public void setTargetAddress(){ + public List<String> getPartitionList() { + List<String> partitionList = new ArrayList<String>(); + + for (int i = 0; i < this.partitionCount; i++) { + partitionList.add(Integer.toString(i)); + } + + return partitionList; + } + + public String getConnectionString() { + return connectionString; + } + + /*Keeping it for backward compatibility*/ + public void setTargetAddress(String targetFqnAddress) { + } + + public void setTargetAddress() { - } + } - public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) { - setTargetAddress(targetFqnAddress); - return this; - } - - public String getOutputStreamId() { - return outputStreamId; - } - - public void setOutputStreamId(String outputStreamId) { - this.outputStreamId = outputStreamId; - } + public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) { + setTargetAddress(targetFqnAddress); + return this; + } + + public String getOutputStreamId() { + return outputStreamId; + } + + public void setOutputStreamId(String outputStreamId) { + this.outputStreamId = outputStreamId; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java index 88855eb..1baf16f 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java @@ -15,14 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; public class FieldConstants { - public static final String PartitionKey = "partitionKey"; - public static final String Offset = "offset"; - public static final String Message = "message"; - public static final String META_DATA = "metadata"; - public static final String SYSTEM_META_DATA = "eventdata_system_properties"; - public static final String DefaultStartingOffset = "-1"; + public static final String PartitionKey = "partitionKey"; + public static final String Offset = "offset"; + public static final String Message = "message"; + public static final String META_DATA = "metadata"; + public static final String SYSTEM_META_DATA = "eventdata_system_properties"; + public static final String DefaultStartingOffset = "-1"; } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java index 854da6f..f2aa158 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java @@ -15,30 +15,30 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import com.microsoft.azure.eventhubs.EventData; -import org.apache.storm.tuple.Fields; - import java.io.Serializable; import java.util.List; +import org.apache.storm.tuple.Fields; public interface IEventDataScheme extends Serializable { - /** - * Deserialize an AMQP Message into a Tuple. - * - * @see #getOutputFields() for the list of fields the tuple will contain. - * - * @param eventData The EventData to Deserialize. - * @return A tuple containing the deserialized fields of the message. - */ - List<Object> deserialize(EventData eventData); + /** + * Deserialize an AMQP Message into a Tuple. + * + * @param eventData The EventData to Deserialize. + * @return A tuple containing the deserialized fields of the message. + * + * @see #getOutputFields() for the list of fields the tuple will contain. + */ + List<Object> deserialize(EventData eventData); - /** - * Retrieve the Fields that are present on tuples created by this object. - * - * @return The Fields that are present on tuples created by this object. - */ - Fields getOutputFields(); + /** + * Retrieve the Fields that are present on tuples created by this object. + * + * @return The Fields that are present on tuples created by this object. + */ + Fields getOutputFields(); } http://git-wip-us.apache.org/repos/asf/storm/blob/18723171/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java index c8de8bc..4af967f 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java @@ -15,19 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.eventhubs.spout; import java.util.Map; public interface IEventHubReceiver { - void open(IEventFilter filter) throws EventHubException; + void open(IEventFilter filter) throws EventHubException; - void close(); + void close(); - boolean isOpen(); + boolean isOpen(); - EventDataWrap receive(); + EventDataWrap receive(); - Map<String, Object> getMetricsData(); + Map<String, Object> getMetricsData(); }
