Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2588#discussion_r173633145
--- Diff:
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/EventHubConfig.java
---
@@ -0,0 +1,342 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+
*******************************************************************************/
+package org.apache.storm.eventhubs.core;
+
+import java.io.Serializable;
+
+import org.apache.storm.eventhubs.format.EventHubMessageDataScheme;
+import org.apache.storm.eventhubs.format.IEventDataScheme;
+import org.apache.storm.eventhubs.format.StringEventDataScheme;
+
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
+import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
+
+/**
+ * Captures connection details for EventHub
+ */
+public class EventHubConfig implements Serializable {
+ private static final long serialVersionUID = -2913928074769667240L;
+ protected String userName;
+ protected String password;
+ protected String namespace;
+ protected String entityPath;
+ protected int partitionCount;
+ protected String zkConnectionString = null;
+ protected int checkpointIntervalInSeconds = 10;
+ protected int receiverCredits = 1024;
+ protected int maxPendingMsgsPerPartition =
FieldConstants.DEFAULT_MAX_PENDING_PER_PARTITION;
+ protected int receiveEventsMaxCount =
FieldConstants.DEFAULT_RECEIVE_MAX_CAP;
+ protected int prefetchCount = FieldConstants.DEFAULT_PREFETCH_COUNT;
+ protected long enqueueTimeFilter = 0;
+ protected String connectionString;
+ protected String topologyName;
+ protected IEventDataScheme eventDataScheme = new
StringEventDataScheme();
+ protected String consumerGroupName =
EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
+
+ public EventHubConfig(String namespace, String entityPath, String
userName, String password, int partitionCount) {
+ this.namespace = namespace;
+ this.entityPath = entityPath;
+ this.userName = userName;
+ this.password = password;
+ this.partitionCount = partitionCount;
+ this.connectionString = new ConnectionStringBuilder()
+ .setNamespaceName(namespace)
+ .setEventHubName(entityPath)
+ .setSasKeyName(userName)
+ .setSasKey(password)
+ .toString();
+ }
+
+ /**
+ * Returns username used in credentials provided to EventHub
+ *
+ * @return username
+ */
+ public String getUserName() {
+ return userName;
+ }
+
+ /**
+ * Returns password used in credentials provided to EventHub
+ *
+ * @return password
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * Returns servicebus namespace used when connecting to EventHub
+ *
+ * @return servicebus namespace
+ */
+ public String getNamespace() {
+ return namespace;
+ }
+
+ /**
+ * Returns name of the EventHub
+ *
+ * @return EventHub name
+ */
+ public String getEntityPath() {
+ return entityPath;
+ }
+
+ /**
+ * Returns specified partition count on the EventHub
+ *
+ * @return partition count
+ */
+ public int getPartitionCount() {
+ return partitionCount;
+ }
+
+ /**
+ * Sets the zookeeper connection string. (Example:
+ * zk1-clusterfqdn:2181,zk2-clusterfqdn:2181)
+ *
+ * @param zkConnectionString Zookeeper connection string
+ */
+ public void setZkConnectionString(String zkConnectionString) {
+ this.zkConnectionString = zkConnectionString;
+ }
+
+ /**
+ * Returns the configured zookeeper connection string.
+ */
+ public String getZkConnectionString() {
+ return zkConnectionString;
+ }
+
+ /**
+ * Returns the specified frequency interval at which checkpoint
information is
+ * persisted.
+ *
+ * @return checkpoint interval
+ */
+ public int getCheckpointIntervalInSeconds() {
+ return checkpointIntervalInSeconds;
+ }
+
+ /**
+ * Sets the frequency with which checkpoint information is persisted to
+ * zookeeper
+ *
+ * @param checkpointIntervalInSeconds
+ */
+ public void setCheckpointIntervalInSeconds(int
checkpointIntervalInSeconds) {
+ this.checkpointIntervalInSeconds = checkpointIntervalInSeconds;
+ }
+
+ /**
+ * Returns configured receivercredits used when connecting to EventHub
Note:
+ * <p>
+ * This is a legacy setting that will soon be deprecated. Please use
the
+ * {@link EventHubConfig#setReceiveEventsMaxCount(int)} instead.
+ * </p>
+ *
+ * @return
+ * @deprecated
+ */
+ public int getReceiverCredits() {
+ return receiverCredits;
+ }
+
+ /**
+ * Configures receivercredits used when connecting to EventHub
+ * <p>
+ * Note: This is a legacy setting that will soon be deprecated. Please
use the
+ * {@link EventHubConfig#setReceiveEventsMaxCount(int)} instead.
+ * </p>
+ *
+ * @deprecated
+ */
+ public void setReceiverCredits(int receiverCredits) {
+ this.receiverCredits = receiverCredits;
+ }
+
+ /**
+ * Returns the configured the size of the pending queue for each
partition.
+ * While the pending queue is at full capacity no new receive calls
will be made
+ * to EventHub. The default value for it is
+ * {@link FieldConstants#DEFAULT_MAX_PENDING_PER_PARTITION}
+ *
+ * @return
+ */
+ public int getMaxPendingMsgsPerPartition() {
+ return maxPendingMsgsPerPartition;
+ }
+
+ /**
+ * configured the size of the pending queue for each partition. While
the
+ * pending queue is at full capacity no new receive calls will be made
to
+ * EventHub. The default value for it is
+ * {@link FieldConstants#DEFAULT_MAX_PENDING_PER_PARTITION}
+ *
+ * @param maxPendingMsgsPerPartition
+ */
+ public void setMaxPendingMsgsPerPartition(int
maxPendingMsgsPerPartition) {
+ this.maxPendingMsgsPerPartition = maxPendingMsgsPerPartition;
+ }
+
+ /**
+ * Returns the configured upper limit on number of events that can be
received
+ * from EventHub per call. Default is
+ * {@link FieldConstants#DEFAULT_RECEIVE_MAX_CAP}
+ *
+ * @return
+ */
+ public int getReceiveEventsMaxCount() {
+ return receiveEventsMaxCount;
+ }
+
+ /**
+ * Configures the upper limit on number of events that can be received
from
+ * EventHub per call. Default is {@link
FieldConstants#DEFAULT_RECEIVE_MAX_CAP}
+ * <p>
+ * Setting this to a value greater than one will reduce the number of
calls that
+ * are made to EventHub. The received events are buffered in an
internal cache
+ * and fed to the spout during the nextTuple call.
+ * </p>
+ *
+ * @param receiveEventsMaxCount
+ * @return
+ */
+ public void setReceiveEventsMaxCount(int receiveEventsMaxCount) {
+ this.receiveEventsMaxCount = receiveEventsMaxCount;
+ }
+
+ /**
+ * Returns the configured value for the TimeBased filter for when to
start
+ * receiving events from.
+ *
+ * @return
+ */
+ public long getEnqueueTimeFilter() {
+ return enqueueTimeFilter;
+ }
+
+ /**
+ * Configures value for the TimeBased filter for when to start
receiving events
+ * from.
+ *
+ * @param enqueueTimeFilter
+ */
+ public void setEnqueueTimeFilter(long enqueueTimeFilter) {
+ this.enqueueTimeFilter = enqueueTimeFilter;
+ }
+
+ /**
+ * Returns the connection string used when talking to EventHub
+ *
+ * @return
+ */
+ public String getConnectionString() {
+ return connectionString;
+ }
+
+ /**
+ * Configures the connection string to be used when talking to EventHub
+ *
+ * @param connectionString
+ */
+ public void setConnectionString(String connectionString) {
+ this.connectionString = connectionString;
+ }
+
+ /**
+ * Name of the toppology
+ *
+ * @return
+ */
+ public String getTopologyName() {
+ return topologyName;
+ }
+
+ /**
+ * Name of the topology
+ *
+ * @param topologyName
+ */
+ public void setTopologyName(String topologyName) {
+ this.topologyName = topologyName;
+ }
+
+ /**
+ * Returns the configured Serialization/Deserialization scheme in use.
+ * <p>
+ * Please refer to {@link IEventDataScheme} for implementation choices.
+ * </p>
+ *
+ * @return
+ */
+ public IEventDataScheme getEventDataScheme() {
+ return eventDataScheme;
+ }
+
+ /**
+ * Configures Serialization/Deserialization scheme in use.
+ * <p>
+ * Please refer to {@link IEventDataScheme} for implementation choices.
+ * </p>
+ *
+ * @param scheme
+ */
+ public void setEventDataScheme(IEventDataScheme scheme) {
+ this.eventDataScheme = scheme;
+ }
+
+ /**
+ * Consumer group name to use when receiveing events from EventHub
--- End diff --
Nit: Receiveing -> receiving
---