Github user rban1 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1985#discussion_r104231241
  
    --- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
 ---
    @@ -65,77 +60,80 @@ public EventHubReceiverImpl(EventHubSpoutConfig config, 
String partitionId) {
       }
     
       @Override
    -  public void open(IEventHubFilter filter) throws EventHubException {
    -    logger.info("creating eventhub receiver: partitionId=" + partitionId + 
    -                   ", filterString=" + filter.getFilterString());
    +  public void open(String offset) throws EventHubException {
    +    logger.info("creating eventhub receiver: partitionId=" + partitionId +
    +            ", offset=" + offset);
         long start = System.currentTimeMillis();
    -    receiver = new ResilientEventHubReceiver(connectionString, entityName,
    -                   partitionId, consumerGroupName, defaultCredits, filter);
    -    receiver.initialize();
    -    
    +    try {
    +      ehClient = 
EventHubClient.createFromConnectionStringSync(connectionString);
    +      receiver = ehClient.createEpochReceiverSync(
    +              consumerGroupName,
    +              partitionId,
    +              offset,
    +              false,
    +              1);
    +    }catch (Exception e){
    +      logger.info("Exception in creating EventhubClient"+e.toString());
    +    }
         long end = System.currentTimeMillis();
         logger.info("created eventhub receiver, time taken(ms): " + 
(end-start));
       }
     
       @Override
    -  public void close() {
    +  public void close(){
         if(receiver != null) {
    -      receiver.close();
    +      try {
    +        receiver.close().whenComplete((voidargs,error)->{
    +          try{
    +            if(error!=null){
    +              logger.error("Exception during receiver close 
phase"+error.toString());
    +            }
    +            ehClient.closeSync();
    +          }catch (Exception e){
    +            logger.error("Exception during ehclient close 
phase"+e.toString());
    +          }
    +        }).get();
    +      }catch (InterruptedException e){
    +        logger.error("Exception occured during close phase"+e.toString());
    +      }catch (ExecutionException e){
    +        logger.error("Exception occured during close phase"+e.toString());
    +      }
           logger.info("closed eventhub receiver: partitionId=" + partitionId );
           receiver = null;
    +      ehClient =  null;
         }
       }
    +
       
       @Override
       public boolean isOpen() {
         return (receiver != null);
       }
     
       @Override
    -  public EventData receive(long timeoutInMilliseconds) {
    +  public EventDataWrap receive() {
         long start = System.currentTimeMillis();
    -    Message message = receiver.receive(timeoutInMilliseconds);
    +    Iterable<EventData> receivedEvents=null;
    +    /*Get one message at a time for backward compatibility behaviour*/
    +    try {
    +      receivedEvents = receiver.receiveSync(1);
    +    }catch (Exception e){
    +      logger.error("Exception occured during receive"+e.toString());
    +    }
         long end = System.currentTimeMillis();
         long millis = (end - start);
         receiveApiLatencyMean.update(millis);
         receiveApiCallCount.incr();
    -    
    -    if (message == null) {
    -      //Temporary workaround for AMQP/EH bug of failing to receive messages
    -      /*if(timeoutInMilliseconds > 100 && millis < 
timeoutInMilliseconds/2) {
    -        throw new RuntimeException(
    -            "Restart EventHubSpout due to failure of receiving messages in 
"
    -            + millis + " millisecond");
    -      }*/
    +    if (receivedEvents == null) {
    --- End diff --
    
    check for length also


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to