Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173635852
  
    --- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
 ---
    @@ -17,246 +17,281 @@
      
*******************************************************************************/
     package org.apache.storm.eventhubs.spout;
     
    -import com.google.common.base.Strings;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +
    +import org.apache.commons.lang.StringUtils;
     import org.apache.storm.Config;
    +import org.apache.storm.eventhubs.core.EventHubConfig;
    +import org.apache.storm.eventhubs.core.EventHubMessage;
    +import org.apache.storm.eventhubs.core.EventHubReceiverImpl;
    +import org.apache.storm.eventhubs.core.FieldConstants;
    +import org.apache.storm.eventhubs.core.IEventHubReceiver;
    +import org.apache.storm.eventhubs.core.IEventHubReceiverFactory;
    +import org.apache.storm.eventhubs.core.IPartitionCoordinator;
    +import org.apache.storm.eventhubs.core.IPartitionManager;
    +import org.apache.storm.eventhubs.core.IPartitionManagerFactory;
    +import org.apache.storm.eventhubs.core.MessageId;
    +import org.apache.storm.eventhubs.core.PartitionManager;
    +import org.apache.storm.eventhubs.core.StaticPartitionCoordinator;
    +import org.apache.storm.eventhubs.state.IStateStore;
    +import org.apache.storm.eventhubs.state.ZookeeperStateStore;
     import org.apache.storm.metric.api.IMetric;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Fields;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.HashMap;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.UUID;
    -
    -public class EventHubSpout extends BaseRichSpout {
    -
    -  private static final Logger logger = 
LoggerFactory.getLogger(EventHubSpout.class);
    -
    -  private final UUID instanceId;
    -  private final EventHubSpoutConfig eventHubConfig;
    -  private final IEventDataScheme scheme;
    -  private final int checkpointIntervalInSeconds;
    -
    -  private IStateStore stateStore;
    -  private IPartitionCoordinator partitionCoordinator;
    -  private IPartitionManagerFactory pmFactory;
    -  private IEventHubReceiverFactory recvFactory;
    -  private SpoutOutputCollector collector;
    -  private long lastCheckpointTime;
    -  private int currentPartitionIndex = -1;
    -
    -  public EventHubSpout(String username, String password, String namespace,
    -      String entityPath, int partitionCount) {
    -    this(new EventHubSpoutConfig(username, password, namespace, 
entityPath, partitionCount));
    -  }
    -
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig) {
    -    this(spoutConfig, null, null, null);
    -  }
    -  
    -  public EventHubSpout(EventHubSpoutConfig spoutConfig,
    -      IStateStore store,
    -      IPartitionManagerFactory pmFactory,
    -      IEventHubReceiverFactory recvFactory) {
    -    this.eventHubConfig = spoutConfig;
    -    this.scheme = spoutConfig.getEventDataScheme();
    -    this.instanceId = UUID.randomUUID();
    -    this.checkpointIntervalInSeconds = 
spoutConfig.getCheckpointIntervalInSeconds();
    -    this.lastCheckpointTime = System.currentTimeMillis();
    -    stateStore = store;
    -    this.pmFactory = pmFactory;
    -    if(this.pmFactory == null) {
    -      this.pmFactory = new IPartitionManagerFactory() {
    -        @Override
    -        public IPartitionManager create(EventHubSpoutConfig spoutConfig,
    -            String partitionId, IStateStore stateStore,
    -            IEventHubReceiver receiver) {
    -          return new PartitionManager(spoutConfig, partitionId,
    -              stateStore, receiver);
    -        }
    -      };
    +import com.google.common.base.Strings;
    +
    +/**
    + * Emit tuples (messages) from an Azure EventHub
    + */
    +public final class EventHubSpout extends BaseRichSpout {
    +
    +    private static final long serialVersionUID = -8460916098313963614L;
    +
    +    private static final Logger LOGGER = 
LoggerFactory.getLogger(EventHubSpout.class);
    +
    +    private final EventHubSpoutConfig eventHubConfig;
    +    private final int checkpointIntervalInSeconds;
    +
    +    private IStateStore stateStore;
    --- End diff --
    
    Nit: Consider declaring the fields that are not serialized "transient" for 
readability


---

Reply via email to