Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173636199
  
    --- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
 ---
    @@ -17,246 +17,281 @@
      
*******************************************************************************/
     package org.apache.storm.eventhubs.spout;
     
    -import com.google.common.base.Strings;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +
    +import org.apache.commons.lang.StringUtils;
     import org.apache.storm.Config;
    +import org.apache.storm.eventhubs.core.EventHubConfig;
    +import org.apache.storm.eventhubs.core.EventHubMessage;
    +import org.apache.storm.eventhubs.core.EventHubReceiverImpl;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.eventhubs.core.IEventHubReceiver;
    +import org.apache.storm.eventhubs.core.IEventHubReceiverFactory;
    +import org.apache.storm.eventhubs.core.IPartitionCoordinator;
    +import org.apache.storm.eventhubs.core.IPartitionManager;
    +import org.apache.storm.eventhubs.core.IPartitionManagerFactory;
    +import org.apache.storm.eventhubs.core.MessageId;
    +import org.apache.storm.eventhubs.core.PartitionManager;
    +import org.apache.storm.eventhubs.core.StaticPartitionCoordinator;
    +import org.apache.storm.eventhubs.state.IStateStore;
    +import org.apache.storm.eventhubs.state.ZookeeperStateStore;
     import org.apache.storm.metric.api.IMetric;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.HashMap;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.UUID;
    -
    -public class EventHubSpout extends BaseRichSpout {
    -
    -  private static final Logger logger = 
LoggerFactory.getLogger(EventHubSpout.class);
    -
    -  private final UUID instanceId;
    -  private final EventHubSpoutConfig eventHubConfig;
    -  private final IEventDataScheme scheme;
    -  private final int checkpointIntervalInSeconds;
    -
    -  private IStateStore stateStore;
    -  private IPartitionCoordinator partitionCoordinator;
    -  private IPartitionManagerFactory pmFactory;
    -  private IEventHubReceiverFactory recvFactory;
    -  private SpoutOutputCollector collector;
    -  private long lastCheckpointTime;
    -  private int currentPartitionIndex = -1;
    -
    -  public EventHubSpout(String username, String password, String namespace,
    -      String entityPath, int partitionCount) {
    -    this(new EventHubSpoutConfig(username, password, namespace, 
entityPath, partitionCount));
    -  }
    -
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig) {
    -    this(spoutConfig, null, null, null);
    -  }
    -  
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig,
    -      IStateStore store,
    -      IPartitionManagerFactory pmFactory,
    -      IEventHubReceiverFactory recvFactory) {
    -    this.eventHubConfig = spoutConfig;
    -    this.scheme = spoutConfig.getEventDataScheme();
    -    this.instanceId = UUID.randomUUID();
    -    this.checkpointIntervalInSeconds = 
spoutConfig.getCheckpointIntervalInSeconds();
    -    this.lastCheckpointTime = System.currentTimeMillis();
    -    stateStore = store;
    -    this.pmFactory = pmFactory;
    -    if(this.pmFactory == null) {
    -      this.pmFactory = new IPartitionManagerFactory() {
    -        @Override
    -        public IPartitionManager create(EventHubSpoutConfig spoutConfig,
    -            String partitionId, IStateStore stateStore,
    -            IEventHubReceiver receiver) {
    -          return new PartitionManager(spoutConfig, partitionId,
    -              stateStore, receiver);
    -        }
    -      };
    +import com.google.common.base.Strings;
    +
    +/**
    + * Emit tuples (messages) from an Azure EventHub
    + */
    +public final class EventHubSpout extends BaseRichSpout {
    +
    +    private static final long serialVersionUID = -8460916098313963614L;
    +
    +    private static final Logger LOGGER = 
LoggerFactory.getLogger(EventHubSpout.class);
    +
    +    private final EventHubSpoutConfig eventHubConfig;
    +    private final int checkpointIntervalInSeconds;
    +
    +    private IStateStore stateStore;
    +    private IPartitionCoordinator partitionCoordinator;
    +    private IPartitionManagerFactory pmFactory;
    +    private IEventHubReceiverFactory recvFactory;
    +    private SpoutOutputCollector collector;
    +    private long lastCheckpointTime;
    +    private int currentPartitionIndex = -1;
    +
    +    public EventHubSpout(
    +            final String username,
    +            final String password,
    +            final String namespace,
    +            final String entityPath,
    +            final int partitionCount) {
    +        this(new EventHubSpoutConfig(username, password, namespace, 
entityPath, partitionCount));
         }
    -    this.recvFactory = recvFactory;
    -    if(this.recvFactory == null) {
    -      this.recvFactory = new IEventHubReceiverFactory() {
    -        @Override
    -        public IEventHubReceiver create(EventHubSpoutConfig spoutConfig,
    -            String partitionId) {
    -          return new EventHubReceiverImpl(spoutConfig, partitionId);
    -        }
    -      };
    +
    +    public EventHubSpout(
    +            final String username,
    +            final String password,
    +            final String namespace,
    +            final String entityPath,
    +            final int partitionCount,
    +            final int batchSize) {
    +        this(new EventHubSpoutConfig(username, password, namespace, 
entityPath, partitionCount, batchSize));
         }
    -    
    -  }
    -  
    -  /**
    -   * This is a extracted method that is easy to test
    -   * @param config
    -   * @param totalTasks
    -   * @param taskIndex
    -   * @param collector
    -   * @throws Exception
    -   */
    -  public void preparePartitions(Map<String, Object> config, int 
totalTasks, int taskIndex, SpoutOutputCollector collector) throws Exception {
    -    this.collector = collector;
    -    if(stateStore == null) {
    -      String zkEndpointAddress = eventHubConfig.getZkConnectionString();
    -      if (zkEndpointAddress == null || zkEndpointAddress.length() == 0) {
    -        //use storm's zookeeper servers if not specified.
    -        @SuppressWarnings("unchecked")
    -        List<String> zkServers = (List<String>) 
config.get(Config.STORM_ZOOKEEPER_SERVERS);
    -        Integer zkPort = ((Number) 
config.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
    -        StringBuilder sb = new StringBuilder();
    -        for (String zk : zkServers) {
    -          if (sb.length() > 0) {
    -            sb.append(',');
    -          }
    -          sb.append(zk+":"+zkPort);
    +
    +    public EventHubSpout(final EventHubSpoutConfig spoutConfig) {
    +        this(spoutConfig, null, null, null);
    +    }
    +
    +    public EventHubSpout(
    +            final EventHubSpoutConfig spoutConfig,
    +            final IStateStore store,
    +            final IPartitionManagerFactory pmFactory,
    +            final IEventHubReceiverFactory recvFactory) {
    +        this.eventHubConfig = spoutConfig;
    +        this.checkpointIntervalInSeconds = 
spoutConfig.getCheckpointIntervalInSeconds();
    +        this.lastCheckpointTime = System.currentTimeMillis();
    +        stateStore = store;
    +        this.pmFactory = pmFactory;
    +        if (this.pmFactory == null) {
    +            this.pmFactory = new IPartitionManagerFactory() {
    +                private static final long serialVersionUID = 
-3134660797825594845L;
    +
    +                @Override
    +                public IPartitionManager create(EventHubConfig ehConfig, 
String partitionId, IStateStore stateStore,
    +                                                IEventHubReceiver 
receiver) {
    +                    return new PartitionManager(spoutConfig, partitionId, 
stateStore, receiver);
    +                }
    +            };
    +        }
    +        this.recvFactory = recvFactory;
    +        if (this.recvFactory == null) {
    +            this.recvFactory = new IEventHubReceiverFactory() {
    +
    +                private static final long serialVersionUID = 
7215384402396274196L;
    +
    +                @Override
    +                public IEventHubReceiver create(EventHubConfig 
spoutConfig, String partitionId) {
    +                    return new EventHubReceiverImpl(spoutConfig, 
partitionId);
    +                }
    +
    +            };
             }
    -        zkEndpointAddress = sb.toString();
    -      }
    -      stateStore = new ZookeeperStateStore(zkEndpointAddress,
    -          
Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()),
    -          
Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL).toString()));
         }
    -    stateStore.open();
     
    -    partitionCoordinator = new StaticPartitionCoordinator(
    -        eventHubConfig, taskIndex, totalTasks, stateStore, pmFactory, 
recvFactory);
    +    /**
    +     * This is a extracted method that is easy to test
    +     *
    +     * @param config
    +     * @param totalTasks
    +     * @param taskIndex
    +     * @param collector
    +     * @throws Exception
    +     */
    +    public void preparePartitions(
    +            final Map config,
    +            final int totalTasks,
    +            final int taskIndex,
    +            final SpoutOutputCollector collector) throws Exception {
    +        this.collector = collector;
    +        if (stateStore == null) {
    +            String zkEndpointAddress = 
eventHubConfig.getZkConnectionString();
    +            if (StringUtils.isBlank(zkEndpointAddress)) {
    +                @SuppressWarnings("unchecked")
    +                List<String> zkServers = (List<String>) 
config.get(Config.STORM_ZOOKEEPER_SERVERS);
    +                Integer zkPort = ((Number) 
config.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
    +                zkEndpointAddress = String.join(",",
    +                        zkServers.stream().map(x -> x + ":" + 
zkPort).collect(Collectors.toList()));
    +            }
     
    -    for (IPartitionManager partitionManager : 
    -      partitionCoordinator.getMyPartitionManagers()) {
    -      partitionManager.open();
    +            stateStore = new ZookeeperStateStore(zkEndpointAddress,
    +                    
Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()),
    --- End diff --
    
    Pretty sure these properties are Numbers. Consider casting to Number and 
using intValue.


---

Reply via email to