Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2588#discussion_r173631260
--- Diff:
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
---
@@ -31,116 +30,131 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.PartitionSender;
+import com.microsoft.azure.eventhubs.EventHubException;
/**
* A bolt that writes event message to EventHub.
+ * <p>
+ * <p>
+ * The implementation has two modes of operation:
+ * <ul>
+ * <li>partitionmode = true, One bolt for per partition write.</li>
+ * <li>partitionmode = false, use default partitioning key strategy to
write to
+ * partition(s)</li>
+ * </ul>
+ * </p>
*/
public class EventHubBolt extends BaseRichBolt {
- private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory
- .getLogger(EventHubBolt.class);
-
- protected OutputCollector collector;
- protected PartitionSender sender;
- protected EventHubClient ehClient;
- protected EventHubBoltConfig boltConfig;
-
- public EventHubBolt(String connectionString, String entityPath) {
- boltConfig = new EventHubBoltConfig(connectionString,
entityPath);
- }
-
- public EventHubBolt(String userName, String password, String namespace,
- String entityPath, boolean partitionMode) {
- boltConfig = new EventHubBoltConfig(userName, password,
namespace,
- entityPath, partitionMode);
- }
-
- public EventHubBolt(EventHubBoltConfig config) {
- boltConfig = config;
- }
-
- @Override
- public void prepare(Map<String, Object> config, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
- String myPartitionId = null;
- if (boltConfig.getPartitionMode()) {
- // We can use the task index (starting from 0) as the
partition ID
- myPartitionId = "" + context.getThisTaskIndex();
- }
- logger.info("creating sender: " +
boltConfig.getConnectionString()
- + ", " + boltConfig.getEntityPath() + ", " +
myPartitionId);
- try {
- ehClient =
EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString());
- if (boltConfig.getPartitionMode()) {
- sender =
ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex()));
- }
- } catch (Exception ex) {
- collector.reportError(ex);
- throw new RuntimeException(ex);
- }
-
- }
-
- @Override
- public void execute(Tuple tuple) {
- try {
- EventData sendEvent = new
EventData(boltConfig.getEventDataFormat().serialize(tuple));
- if (boltConfig.getPartitionMode() && sender!=null) {
- sender.sendSync(sendEvent);
- }
- else if (boltConfig.getPartitionMode() && sender==null)
{
- throw new EventHubException("Sender is null");
- }
- else if (!boltConfig.getPartitionMode() &&
ehClient!=null) {
- ehClient.sendSync(sendEvent);
- }
- else if (!boltConfig.getPartitionMode() &&
ehClient==null) {
- throw new EventHubException("ehclient is null");
- }
- collector.ack(tuple);
- } catch (EventHubException ex ) {
- collector.reportError(ex);
- collector.fail(tuple);
- } catch (ServiceBusException e) {
- collector.reportError(e);
- collector.fail(tuple);
- }
- }
-
- @Override
- public void cleanup() {
- if(sender != null) {
- try {
- sender.close().whenComplete((voidargs,error)->{
- try{
- if(error!=null){
- logger.error("Exception
during sender cleanup phase"+error.toString());
- }
- ehClient.closeSync();
- }catch (Exception e){
- logger.error("Exception during
ehclient cleanup phase"+e.toString());
- }
- }).get();
- } catch (InterruptedException e) {
- logger.error("Exception occured during cleanup
phase"+e.toString());
- } catch (ExecutionException e) {
- logger.error("Exception occured during cleanup
phase"+e.toString());
- }
- logger.info("Eventhub Bolt cleaned up");
- sender = null;
- ehClient = null;
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger =
LoggerFactory.getLogger(EventHubBolt.class);
+
+ private ExecutorService executorService;
+ protected OutputCollector collector;
+ protected EventHubClient ehClient;
+ protected PartitionSender sender;
+ protected EventHubBoltConfig boltConfig;
+
+ /**
+ * Constructs an instance that uses the specified connection string to
connect
+ * to an EventHub and write to the specified entityPath
+ *
+ * @param connectionString EventHub connection String
+ * @param entityPath entity path to write to
+ */
+ public EventHubBolt(String connectionString, String entityPath) {
+ boltConfig = new EventHubBoltConfig(connectionString, entityPath);
+ }
+
+ /**
+ * Constructs an instance that connects to an EventHub using the
specified
+ * connection credentials.
+ *
+ * @param userName UserName to connect as
+ * @param password Password to use
+ * @param namespace target namespace for the service bus
+ * @param entityPath Name of the event hub
+ * @param partitionMode number of partitions
--- End diff --
This description might need to be updated.
---