Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2588#discussion_r173634265
--- Diff:
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/EventHubReceiverImpl.java
---
@@ -0,0 +1,155 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+
*******************************************************************************/
+package org.apache.storm.eventhubs.core;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.microsoft.azure.eventhubs.EventHubException;
+import org.apache.storm.metric.api.CountMetric;
+import org.apache.storm.metric.api.MeanReducer;
+import org.apache.storm.metric.api.ReducedMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
+
+/**
+ * {@link PartitionReceiver} based implementation to receives messages
from a
+ * given Eventhub partition
+ *
+ */
+public class EventHubReceiverImpl implements IEventHubReceiver {
+ private static final Logger logger =
LoggerFactory.getLogger(EventHubReceiverImpl.class);
+
+ private final EventHubConfig eventHubConfig;
+ private final String partitionId;
+
+ private PartitionReceiver receiver;
+ private EventHubClient ehClient;
+ private ExecutorService executorService;
+
+ private ReducedMetric receiveApiLatencyMean;
+ private CountMetric receiveApiCallCount;
+ private CountMetric receiveMessageCount;
+
+ /**
+ * Creates a new instance based on provided configuration. The
connection, and
+ * consumer group settings are read from the passed in EventHubConfig
instance.
+ *
+ * @param config
+ * Connection, consumer group settings
+ * @param partitionId
+ * target partition id to connect to and read from
+ */
+ public EventHubReceiverImpl(EventHubConfig config, String partitionId) {
+ this.partitionId = partitionId;
+ this.eventHubConfig = config;
+
+ receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
+ receiveApiCallCount = new CountMetric();
+ receiveMessageCount = new CountMetric();
+ }
+
+ @Override
+ public void open(IEventFilter filter) throws IOException,
EventHubException {
+ long start = System.currentTimeMillis();
+ logger.debug(String.format("Creating EventHub Client:
partitionId: %s, filter value:%s, prefetchCount: %s",
+ partitionId, filter.toString(),
String.valueOf(eventHubConfig.getPrefetchCount())));
+ executorService = Executors.newSingleThreadExecutor();
+ ehClient =
EventHubClient.createSync(eventHubConfig.getConnectionString(),
executorService);
+ receiver = PartitionReceiverFactory.createReceiver(ehClient,
filter, eventHubConfig, partitionId);
+ receiver.setPrefetchCount(eventHubConfig.getPrefetchCount());
+ logger.debug("created eventhub receiver, time taken(ms): " +
(System.currentTimeMillis() - start));
+ }
+
+ @Override
+ public void close() {
+ if (receiver == null)
+ return;
+
+ try {
+ receiver.close().whenCompleteAsync((voidargs, error) ->
{
+ try {
+ if (error != null) {
+ logger.error("Exception during
receiver close phase: " + error.toString());
+ }
+ ehClient.closeSync();
+ } catch (Exception e) {
+ logger.error("Exception during ehclient
close phase: " + e.toString());
+ }
+ }).get();
+ } catch (InterruptedException | ExecutionException e) {
+ logger.warn("Exception occured during close phase: " +
e.toString());
+ }
+
+ executorService.shutdown();
+
+ logger.info("closed eventhub receiver: partitionId=" +
partitionId);
+ ehClient = null;
+ receiver = null;
+ executorService = null;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return (receiver != null);
+ }
+
+ @Override
+ public Iterable<EventData> receive() {
+ return receive(eventHubConfig.getReceiveEventsMaxCount());
+ }
+
+ @Override
+ public Iterable<EventData> receive(int batchSize) {
+ long start = System.currentTimeMillis();
+ Iterable<EventData> receivedEvents = null;
+
+ try {
+ receivedEvents = receiver.receiveSync(batchSize);
+ if (receivedEvents != null) {
+ logger.debug("Batchsize: " + batchSize + ",
Received event count: " + Iterables.size(receivedEvents));
+ }
+ } catch (EventHubException e) {
+ logger.error("Exception occured during receive" +
e.toString());
+ return null;
+ }
+ long end = System.currentTimeMillis();
+ long millis = (end - start);
+ receiveApiLatencyMean.update(millis);
+ receiveApiCallCount.incr();
+ return receivedEvents;
--- End diff --
I think it would be safer if you used an empty list instead of null when
receiving nothing
---