Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2588#discussion_r173636199 --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java --- @@ -17,246 +17,281 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import com.google.common.base.Strings; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.lang.StringUtils; import org.apache.storm.Config; +import org.apache.storm.eventhubs.core.EventHubConfig; +import org.apache.storm.eventhubs.core.EventHubMessage; +import org.apache.storm.eventhubs.core.EventHubReceiverImpl; +import org.apache.storm.eventhubs.core.FieldConstants; +import org.apache.storm.eventhubs.core.IEventHubReceiver; +import org.apache.storm.eventhubs.core.IEventHubReceiverFactory; +import org.apache.storm.eventhubs.core.IPartitionCoordinator; +import org.apache.storm.eventhubs.core.IPartitionManager; +import org.apache.storm.eventhubs.core.IPartitionManagerFactory; +import org.apache.storm.eventhubs.core.MessageId; +import org.apache.storm.eventhubs.core.PartitionManager; +import org.apache.storm.eventhubs.core.StaticPartitionCoordinator; +import org.apache.storm.eventhubs.state.IStateStore; +import org.apache.storm.eventhubs.state.ZookeeperStateStore; import org.apache.storm.metric.api.IMetric; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -public class EventHubSpout extends BaseRichSpout { - - private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class); - - private final UUID instanceId; - private final EventHubSpoutConfig eventHubConfig; - private final IEventDataScheme scheme; - private final int checkpointIntervalInSeconds; - - private IStateStore stateStore; - private IPartitionCoordinator partitionCoordinator; - private IPartitionManagerFactory pmFactory; - private IEventHubReceiverFactory recvFactory; - private SpoutOutputCollector collector; - private long lastCheckpointTime; - private int currentPartitionIndex = -1; - - public EventHubSpout(String username, String password, String namespace, - String entityPath, int partitionCount) { - this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount)); - } - - public EventHubSpout(EventHubSpoutConfig spoutConfig) { - this(spoutConfig, null, null, null); - } - - public EventHubSpout(EventHubSpoutConfig spoutConfig, - IStateStore store, - IPartitionManagerFactory pmFactory, - IEventHubReceiverFactory recvFactory) { - this.eventHubConfig = spoutConfig; - this.scheme = spoutConfig.getEventDataScheme(); - this.instanceId = UUID.randomUUID(); - this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds(); - this.lastCheckpointTime = System.currentTimeMillis(); - stateStore = store; - this.pmFactory = pmFactory; - if(this.pmFactory == null) { - this.pmFactory = new IPartitionManagerFactory() { - @Override - public IPartitionManager create(EventHubSpoutConfig spoutConfig, - String partitionId, IStateStore stateStore, - IEventHubReceiver receiver) { - return new PartitionManager(spoutConfig, partitionId, - stateStore, receiver); - } - }; +import com.google.common.base.Strings; + +/** + * Emit tuples (messages) from an Azure EventHub + */ +public final class EventHubSpout extends BaseRichSpout { + + private static final long serialVersionUID = -8460916098313963614L; + + private static final Logger LOGGER = LoggerFactory.getLogger(EventHubSpout.class); + + private final EventHubSpoutConfig eventHubConfig; + private final int checkpointIntervalInSeconds; + + private IStateStore stateStore; + private IPartitionCoordinator partitionCoordinator; + private IPartitionManagerFactory pmFactory; + private IEventHubReceiverFactory recvFactory; + private SpoutOutputCollector collector; + private long lastCheckpointTime; + private int currentPartitionIndex = -1; + + public EventHubSpout( + final String username, + final String password, + final String namespace, + final String entityPath, + final int partitionCount) { + this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount)); } - this.recvFactory = recvFactory; - if(this.recvFactory == null) { - this.recvFactory = new IEventHubReceiverFactory() { - @Override - public IEventHubReceiver create(EventHubSpoutConfig spoutConfig, - String partitionId) { - return new EventHubReceiverImpl(spoutConfig, partitionId); - } - }; + + public EventHubSpout( + final String username, + final String password, + final String namespace, + final String entityPath, + final int partitionCount, + final int batchSize) { + this(new EventHubSpoutConfig(username, password, namespace, entityPath, partitionCount, batchSize)); } - - } - - /** - * This is a extracted method that is easy to test - * @param config - * @param totalTasks - * @param taskIndex - * @param collector - * @throws Exception - */ - public void preparePartitions(Map<String, Object> config, int totalTasks, int taskIndex, SpoutOutputCollector collector) throws Exception { - this.collector = collector; - if(stateStore == null) { - String zkEndpointAddress = eventHubConfig.getZkConnectionString(); - if (zkEndpointAddress == null || zkEndpointAddress.length() == 0) { - //use storm's zookeeper servers if not specified. - @SuppressWarnings("unchecked") - List<String> zkServers = (List<String>) config.get(Config.STORM_ZOOKEEPER_SERVERS); - Integer zkPort = ((Number) config.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); - StringBuilder sb = new StringBuilder(); - for (String zk : zkServers) { - if (sb.length() > 0) { - sb.append(','); - } - sb.append(zk+":"+zkPort); + + public EventHubSpout(final EventHubSpoutConfig spoutConfig) { + this(spoutConfig, null, null, null); + } + + public EventHubSpout( + final EventHubSpoutConfig spoutConfig, + final IStateStore store, + final IPartitionManagerFactory pmFactory, + final IEventHubReceiverFactory recvFactory) { + this.eventHubConfig = spoutConfig; + this.checkpointIntervalInSeconds = spoutConfig.getCheckpointIntervalInSeconds(); + this.lastCheckpointTime = System.currentTimeMillis(); + stateStore = store; + this.pmFactory = pmFactory; + if (this.pmFactory == null) { + this.pmFactory = new IPartitionManagerFactory() { + private static final long serialVersionUID = -3134660797825594845L; + + @Override + public IPartitionManager create(EventHubConfig ehConfig, String partitionId, IStateStore stateStore, + IEventHubReceiver receiver) { + return new PartitionManager(spoutConfig, partitionId, stateStore, receiver); + } + }; + } + this.recvFactory = recvFactory; + if (this.recvFactory == null) { + this.recvFactory = new IEventHubReceiverFactory() { + + private static final long serialVersionUID = 7215384402396274196L; + + @Override + public IEventHubReceiver create(EventHubConfig spoutConfig, String partitionId) { + return new EventHubReceiverImpl(spoutConfig, partitionId); + } + + }; } - zkEndpointAddress = sb.toString(); - } - stateStore = new ZookeeperStateStore(zkEndpointAddress, - Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()), - Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL).toString())); } - stateStore.open(); - partitionCoordinator = new StaticPartitionCoordinator( - eventHubConfig, taskIndex, totalTasks, stateStore, pmFactory, recvFactory); + /** + * This is a extracted method that is easy to test + * + * @param config + * @param totalTasks + * @param taskIndex + * @param collector + * @throws Exception + */ + public void preparePartitions( + final Map config, + final int totalTasks, + final int taskIndex, + final SpoutOutputCollector collector) throws Exception { + this.collector = collector; + if (stateStore == null) { + String zkEndpointAddress = eventHubConfig.getZkConnectionString(); + if (StringUtils.isBlank(zkEndpointAddress)) { + @SuppressWarnings("unchecked") + List<String> zkServers = (List<String>) config.get(Config.STORM_ZOOKEEPER_SERVERS); + Integer zkPort = ((Number) config.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); + zkEndpointAddress = String.join(",", + zkServers.stream().map(x -> x + ":" + zkPort).collect(Collectors.toList())); + } - for (IPartitionManager partitionManager : - partitionCoordinator.getMyPartitionManagers()) { - partitionManager.open(); + stateStore = new ZookeeperStateStore(zkEndpointAddress, + Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()), --- End diff -- Pretty sure these properties are Numbers. Consider casting to Number and using intValue.
---