Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2588#discussion_r173631711 --- Diff: external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java --- @@ -31,116 +30,131 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.concurrent.ExecutionException; - -import java.util.Map; -import java.util.concurrent.ExecutionException; +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.PartitionSender; +import com.microsoft.azure.eventhubs.EventHubException; /** * A bolt that writes event message to EventHub. + * <p> + * <p> + * The implementation has two modes of operation: + * <ul> + * <li>partitionmode = true, One bolt for per partition write.</li> + * <li>partitionmode = false, use default partitioning key strategy to write to + * partition(s)</li> + * </ul> + * </p> */ public class EventHubBolt extends BaseRichBolt { - private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory - .getLogger(EventHubBolt.class); - - protected OutputCollector collector; - protected PartitionSender sender; - protected EventHubClient ehClient; - protected EventHubBoltConfig boltConfig; - - public EventHubBolt(String connectionString, String entityPath) { - boltConfig = new EventHubBoltConfig(connectionString, entityPath); - } - - public EventHubBolt(String userName, String password, String namespace, - String entityPath, boolean partitionMode) { - boltConfig = new EventHubBoltConfig(userName, password, namespace, - entityPath, partitionMode); - } - - public EventHubBolt(EventHubBoltConfig config) { - boltConfig = config; - } - - @Override - public void prepare(Map<String, Object> config, TopologyContext context, - OutputCollector collector) { - this.collector = collector; - String myPartitionId = null; - if (boltConfig.getPartitionMode()) { - // We can use the task index (starting from 0) as the partition ID - myPartitionId = "" + context.getThisTaskIndex(); - } - logger.info("creating sender: " + boltConfig.getConnectionString() - + ", " + boltConfig.getEntityPath() + ", " + myPartitionId); - try { - ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString()); - if (boltConfig.getPartitionMode()) { - sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex())); - } - } catch (Exception ex) { - collector.reportError(ex); - throw new RuntimeException(ex); - } - - } - - @Override - public void execute(Tuple tuple) { - try { - EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple)); - if (boltConfig.getPartitionMode() && sender!=null) { - sender.sendSync(sendEvent); - } - else if (boltConfig.getPartitionMode() && sender==null) { - throw new EventHubException("Sender is null"); - } - else if (!boltConfig.getPartitionMode() && ehClient!=null) { - ehClient.sendSync(sendEvent); - } - else if (!boltConfig.getPartitionMode() && ehClient==null) { - throw new EventHubException("ehclient is null"); - } - collector.ack(tuple); - } catch (EventHubException ex ) { - collector.reportError(ex); - collector.fail(tuple); - } catch (ServiceBusException e) { - collector.reportError(e); - collector.fail(tuple); - } - } - - @Override - public void cleanup() { - if(sender != null) { - try { - sender.close().whenComplete((voidargs,error)->{ - try{ - if(error!=null){ - logger.error("Exception during sender cleanup phase"+error.toString()); - } - ehClient.closeSync(); - }catch (Exception e){ - logger.error("Exception during ehclient cleanup phase"+e.toString()); - } - }).get(); - } catch (InterruptedException e) { - logger.error("Exception occured during cleanup phase"+e.toString()); - } catch (ExecutionException e) { - logger.error("Exception occured during cleanup phase"+e.toString()); - } - logger.info("Eventhub Bolt cleaned up"); - sender = null; - ehClient = null; - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } - + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(EventHubBolt.class); + + private ExecutorService executorService; + protected OutputCollector collector; + protected EventHubClient ehClient; + protected PartitionSender sender; + protected EventHubBoltConfig boltConfig; + + /** + * Constructs an instance that uses the specified connection string to connect + * to an EventHub and write to the specified entityPath + * + * @param connectionString EventHub connection String + * @param entityPath entity path to write to + */ + public EventHubBolt(String connectionString, String entityPath) { + boltConfig = new EventHubBoltConfig(connectionString, entityPath); + } + + /** + * Constructs an instance that connects to an EventHub using the specified + * connection credentials. + * + * @param userName UserName to connect as + * @param password Password to use + * @param namespace target namespace for the service bus + * @param entityPath Name of the event hub + * @param partitionMode number of partitions + */ + public EventHubBolt(String userName, String password, String namespace, String entityPath, boolean partitionMode) { + boltConfig = new EventHubBoltConfig(userName, password, namespace, entityPath, partitionMode); + } + + /** + * Constructs an instance using the specified configuration + * + * @param config EventHub connection and partition configuration + */ + public EventHubBolt(EventHubBoltConfig config) { + boltConfig = config; + } + + @Override + public void prepare(Map<String, Object> config, TopologyContext context, + OutputCollector collector) { + this.collector = collector; + logger.info(String.format("Conn String: %s, PartitionMode: %s", boltConfig.getConnectionString(), + String.valueOf(boltConfig.getPartitionMode()))); + try { + executorService = Executors.newSingleThreadExecutor(); + ehClient = EventHubClient.createSync(boltConfig.getConnectionString(), executorService); + if (boltConfig.getPartitionMode()) { + String partitionId = String.valueOf(context.getThisTaskIndex()); + logger.info("Writing to partition id: " + partitionId); + sender = ehClient.createPartitionSenderSync(partitionId); + } + } catch (Exception ex) { + collector.reportError(ex); + throw new RuntimeException(ex); + } + } + + @Override + public void execute(Tuple tuple) { + try { + EventData sendEvent = EventData.create(boltConfig.getEventDataFormat().serialize(tuple)); + if (sender == null) { + ehClient.sendSync(sendEvent); + } else { + sender.sendSync(sendEvent); + } + collector.ack(tuple); + } catch (EventHubException e) { + collector.reportError(e); + collector.fail(tuple); + } + } + + @Override + public void cleanup() { + logger.debug("EventHubBolt cleanup"); + try { + ehClient.close().whenCompleteAsync((voidargs, error) -> { --- End diff -- Nit: Any reason to use whenCompleteAsync over whenComplete?
---