Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173631711
  
    --- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
 ---
    @@ -31,116 +30,131 @@
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import java.util.Map;
    -import java.util.concurrent.ExecutionException;
    -
    -import java.util.Map;
    -import java.util.concurrent.ExecutionException;
    +import com.microsoft.azure.eventhubs.EventData;
    +import com.microsoft.azure.eventhubs.EventHubClient;
    +import com.microsoft.azure.eventhubs.PartitionSender;
    +import com.microsoft.azure.eventhubs.EventHubException;
     
     /**
      * A bolt that writes event message to EventHub.
    + * <p>
    + * <p>
    + * The implementation has two modes of operation:
    + * <ul>
    + * <li>partitionmode = true, One bolt for per partition write.</li>
    + * <li>partitionmode = false, use default partitioning key strategy to 
write to
    + * partition(s)</li>
    + * </ul>
    + * </p>
      */
     public class EventHubBolt extends BaseRichBolt {
    -   private static final long serialVersionUID = 1L;
    -   private static final Logger logger = LoggerFactory
    -                   .getLogger(EventHubBolt.class);
    -
    -   protected OutputCollector collector;
    -   protected PartitionSender sender;
    -   protected EventHubClient ehClient;
    -   protected EventHubBoltConfig boltConfig;
    -
    -   public EventHubBolt(String connectionString, String entityPath) {
    -           boltConfig = new EventHubBoltConfig(connectionString, 
entityPath);
    -   }
    -
    -   public EventHubBolt(String userName, String password, String namespace,
    -                   String entityPath, boolean partitionMode) {
    -           boltConfig = new EventHubBoltConfig(userName, password, 
namespace,
    -                           entityPath, partitionMode);
    -   }
    -
    -   public EventHubBolt(EventHubBoltConfig config) {
    -           boltConfig = config;
    -   }
    -
    -   @Override
    -   public void prepare(Map<String, Object> config, TopologyContext context,
    -                   OutputCollector collector) {
    -           this.collector = collector;
    -           String myPartitionId = null;
    -           if (boltConfig.getPartitionMode()) {
    -                   // We can use the task index (starting from 0) as the 
partition ID
    -                   myPartitionId = "" + context.getThisTaskIndex();
    -           }
    -           logger.info("creating sender: " + 
boltConfig.getConnectionString()
    -                           + ", " + boltConfig.getEntityPath() + ", " + 
myPartitionId);
    -           try {
    -                   ehClient = 
EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
    -                   if (boltConfig.getPartitionMode()) {
    -                           sender = 
ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
    -                   }
    -           } catch (Exception ex) {
    -                   collector.reportError(ex);
    -                   throw new RuntimeException(ex);
    -           }
    -
    -   }
    -
    -   @Override
    -   public void execute(Tuple tuple) {
    -           try {
    -                   EventData sendEvent = new 
EventData(boltConfig.getEventDataFormat().serialize(tuple));
    -                   if (boltConfig.getPartitionMode() && sender!=null) {
    -                           sender.sendSync(sendEvent);
    -                   }
    -                   else if (boltConfig.getPartitionMode() && sender==null) 
{
    -                           throw new EventHubException("Sender is null");
    -                   }
    -                   else if (!boltConfig.getPartitionMode() && 
ehClient!=null) {
    -                           ehClient.sendSync(sendEvent);
    -                   }
    -                   else if (!boltConfig.getPartitionMode() && 
ehClient==null) {
    -                           throw new EventHubException("ehclient is null");
    -                   }
    -                   collector.ack(tuple);
    -           } catch (EventHubException ex ) {
    -                   collector.reportError(ex);
    -                   collector.fail(tuple);
    -           } catch (ServiceBusException e) {
    -                   collector.reportError(e);
    -                   collector.fail(tuple);
    -           }
    -   }
    -
    -   @Override
    -   public void cleanup() {
    -           if(sender != null) {
    -                   try {
    -                           sender.close().whenComplete((voidargs,error)->{
    -                                   try{
    -                                           if(error!=null){
    -                                                   logger.error("Exception 
during sender cleanup phase"+error.toString());
    -                                           }
    -                                           ehClient.closeSync();
    -                                   }catch (Exception e){
    -                                           logger.error("Exception during 
ehclient cleanup phase"+e.toString());
    -                                   }
    -                           }).get();
    -                   } catch (InterruptedException e) {
    -                           logger.error("Exception occured during cleanup 
phase"+e.toString());
    -                   } catch (ExecutionException e) {
    -                           logger.error("Exception occured during cleanup 
phase"+e.toString());
    -                   }
    -                   logger.info("Eventhub Bolt cleaned up");
    -                   sender = null;
    -                   ehClient =  null;
    -           }
    -   }
    -
    -   @Override
    -   public void declareOutputFields(OutputFieldsDeclarer declarer) {
    -
    -   }
    -
    +    private static final long serialVersionUID = 1L;
    +    private static final Logger logger = 
LoggerFactory.getLogger(EventHubBolt.class);
    +
    +    private ExecutorService executorService;
    +    protected OutputCollector collector;
    +    protected EventHubClient ehClient;
    +    protected PartitionSender sender;
    +    protected EventHubBoltConfig boltConfig;
    +
    +    /**
    +     * Constructs an instance that uses the specified connection string to 
connect
    +     * to an EventHub and write to the specified entityPath
    +     *
    +     * @param connectionString EventHub connection String
    +     * @param entityPath       entity path to write to
    +     */
    +    public EventHubBolt(String connectionString, String entityPath) {
    +        boltConfig = new EventHubBoltConfig(connectionString, entityPath);
    +    }
    +
    +    /**
    +     * Constructs an instance that connects to an EventHub using the 
specified
    +     * connection credentials.
    +     *
    +     * @param userName      UserName to connect as
    +     * @param password      Password to use
    +     * @param namespace     target namespace for the service bus
    +     * @param entityPath    Name of the event hub
    +     * @param partitionMode number of partitions
    +     */
    +    public EventHubBolt(String userName, String password, String 
namespace, String entityPath, boolean partitionMode) {
    +        boltConfig = new EventHubBoltConfig(userName, password, namespace, 
entityPath, partitionMode);
    +    }
    +
    +    /**
    +     * Constructs an instance using the specified configuration
    +     *
    +     * @param config EventHub connection and partition configuration
    +     */
    +    public EventHubBolt(EventHubBoltConfig config) {
    +        boltConfig = config;
    +    }
    +
    +    @Override
    +    public void prepare(Map<String, Object> config, TopologyContext 
context,
    +                        OutputCollector collector) {
    +        this.collector = collector;
    +        logger.info(String.format("Conn String: %s, PartitionMode: %s", 
boltConfig.getConnectionString(),
    +                String.valueOf(boltConfig.getPartitionMode())));
    +        try {
    +            executorService = Executors.newSingleThreadExecutor();
    +            ehClient = 
EventHubClient.createSync(boltConfig.getConnectionString(), executorService);
    +            if (boltConfig.getPartitionMode()) {
    +                String partitionId = 
String.valueOf(context.getThisTaskIndex());
    +                logger.info("Writing to partition id: " + partitionId);
    +                sender = ehClient.createPartitionSenderSync(partitionId);
    +            }
    +        } catch (Exception ex) {
    +            collector.reportError(ex);
    +            throw new RuntimeException(ex);
    +        }
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            EventData sendEvent = 
EventData.create(boltConfig.getEventDataFormat().serialize(tuple));
    +            if (sender == null) {
    +                ehClient.sendSync(sendEvent);
    +            } else {
    +                sender.sendSync(sendEvent);
    +            }
    +            collector.ack(tuple);
    +        } catch (EventHubException e) {
    +            collector.reportError(e);
    +            collector.fail(tuple);
    +        }
    +    }
    +
    +    @Override
    +    public void cleanup() {
    +        logger.debug("EventHubBolt cleanup");
    +        try {
    +            ehClient.close().whenCompleteAsync((voidargs, error) -> {
    --- End diff --
    
    Nit: Any reason to use whenCompleteAsync over whenComplete?


---

Reply via email to