Github user SreeramGarlapati commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1951#discussion_r102774770
  
    --- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
 ---
    @@ -92,50 +110,34 @@ public boolean isOpen() {
       }
     
       @Override
    -  public EventData receive(long timeoutInMilliseconds) {
    +  public EventData receive() {
         long start = System.currentTimeMillis();
    -    Message message = receiver.receive(timeoutInMilliseconds);
    +    Iterable<com.microsoft.azure.eventhubs.EventData> receivedEvents=null;
    +        /*Get one message at a time for backward compatibility behaviour*/
    +    try {
    +      receivedEvents = receiver.receive(1).get();
    +    }catch (Exception e){
    +      logger.error("Exception occured during receive"+e.toString());
    +    }
         long end = System.currentTimeMillis();
         long millis = (end - start);
         receiveApiLatencyMean.update(millis);
         receiveApiCallCount.incr();
    -    
    -    if (message == null) {
    -      //Temporary workaround for AMQP/EH bug of failing to receive messages
    -      /*if(timeoutInMilliseconds > 100 && millis < 
timeoutInMilliseconds/2) {
    -        throw new RuntimeException(
    -            "Restart EventHubSpout due to failure of receiving messages in 
"
    -            + millis + " millisecond");
    -      }*/
    +    if (receivedEvents == null) {
           return null;
         }
    -
         receiveMessageCount.incr();
    -
    -    MessageId messageId = createMessageId(message);
    -    return EventData.create(message, messageId);
    -  }
    -  
    -  private MessageId createMessageId(Message message) {
    -    String offset = null;
    -    long sequenceNumber = 0;
    -
    -    for (Section section : message.getPayload()) {
    -      if (section instanceof MessageAnnotations) {
    -        MessageAnnotations annotations = (MessageAnnotations) section;
    -        HashMap annonationMap = (HashMap) annotations.getValue();
    -
    -        if (annonationMap.containsKey(OffsetKey)) {
    -          offset = (String) annonationMap.get(OffsetKey);
    -        }
    -
    -        if (annonationMap.containsKey(SequenceNumberKey)) {
    -          sequenceNumber = (Long) annonationMap.get(SequenceNumberKey);
    -        }
    -      }
    +    MessageId messageId=null;
    +    Message message=null;
    +    for (com.microsoft.azure.eventhubs.EventData receivedEvent : 
receivedEvents) {
    +      messageId = new MessageId(partitionId,
    +              receivedEvent.getSystemProperties().getOffset(),
    +              receivedEvent.getSystemProperties().getSequenceNumber());
    +      List<Section> body = new ArrayList<Section>();
    +      body.add(new Data(new Binary((new String(receivedEvent.getBody(), 
Charset.defaultCharset())).getBytes())));
    +      message = new Message(body);
    --- End diff --
    
    This is not equivalent to the existing behavior. Message could have other 
amqp sections - ApplicationProperties and SystemProperties.
    Ideally, you should return com.microsoft.azure.eventhubs.EventData. Please 
remove the EventData type created in the spout library - to eliminate confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to