Github user SreeramGarlapati commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1951#discussion_r102773626
  
    --- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
 ---
    @@ -92,50 +110,34 @@ public boolean isOpen() {
       }
     
       @Override
    -  public EventData receive(long timeoutInMilliseconds) {
    +  public EventData receive() {
         long start = System.currentTimeMillis();
    -    Message message = receiver.receive(timeoutInMilliseconds);
    +    Iterable<com.microsoft.azure.eventhubs.EventData> receivedEvents=null;
    +        /*Get one message at a time for backward compatibility behaviour*/
    +    try {
    +      receivedEvents = receiver.receive(1).get();
    +    }catch (Exception e){
    +      logger.error("Exception occured during receive"+e.toString());
    +    }
         long end = System.currentTimeMillis();
         long millis = (end - start);
         receiveApiLatencyMean.update(millis);
         receiveApiCallCount.incr();
    -    
    -    if (message == null) {
    -      //Temporary workaround for AMQP/EH bug of failing to receive messages
    -      /*if(timeoutInMilliseconds > 100 && millis < 
timeoutInMilliseconds/2) {
    -        throw new RuntimeException(
    -            "Restart EventHubSpout due to failure of receiving messages in 
"
    -            + millis + " millisecond");
    -      }*/
    +    if (receivedEvents == null) {
           return null;
         }
    -
         receiveMessageCount.incr();
    -
    -    MessageId messageId = createMessageId(message);
    -    return EventData.create(message, messageId);
    -  }
    -  
    -  private MessageId createMessageId(Message message) {
    -    String offset = null;
    -    long sequenceNumber = 0;
    -
    -    for (Section section : message.getPayload()) {
    -      if (section instanceof MessageAnnotations) {
    -        MessageAnnotations annotations = (MessageAnnotations) section;
    -        HashMap annonationMap = (HashMap) annotations.getValue();
    -
    -        if (annonationMap.containsKey(OffsetKey)) {
    -          offset = (String) annonationMap.get(OffsetKey);
    -        }
    -
    -        if (annonationMap.containsKey(SequenceNumberKey)) {
    -          sequenceNumber = (Long) annonationMap.get(SequenceNumberKey);
    -        }
    -      }
    +    MessageId messageId=null;
    +    Message message=null;
    +    for (com.microsoft.azure.eventhubs.EventData receivedEvent : 
receivedEvents) {
    --- End diff --
    
    >for (com.microsoft.azure.eventhubs.EventData receivedEvent : 
receivedEvents) { [](start = 4, length = 78)
    
    Why ? In the receive(1) call above code - you explicitly specified receive 
1 message - so remove this..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to