-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/66899/#review202396
-----------------------------------------------------------




webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 577 (patched)
<https://reviews.apache.org/r/66899/#comment284205>

    this catch block eats the exception. Instead, consider using finally.
    
    long offsetFailedCommit = -1;
    
    void commit(AtlasKafkaMessage<HookNotification> kafkaMessage) {
      boolean isCommitSuccess = false;
    
      try {
        ...
    
        isCommitSuccess = true;
      } finally {
        offsetFailedCommit = isCommitSuccess ? -1 : kafkaMessage.getOffset();
      }
    }
    
    void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) throws 
AtlasServiceException, AtlasException {
    
      if(offsetFailedCommit != -1 && offsetFailedCommit == 
kafkaMsg.getOffset()) \\\{
        LOG.info("message was already processed: offset={}", 
kafkaMsg.getOffset());
    
        offsetFailedCommit = -1;
    
        consumer.commit(new TopicPartition("ATLAS_HOOK", 
kafkaMsg.getPartition()), kafkaMessage.getOffset() + 1);
    
        return;
      }
    
      ...
    
    }


- Madhan Neethiraj


On May 3, 2018, 11:29 p.m., Ashutosh Mestry wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/66899/
> -----------------------------------------------------------
> 
> (Updated May 3, 2018, 11:29 p.m.)
> 
> 
> Review request for atlas and Madhan Neethiraj.
> 
> 
> Bugs: ATLAS-2634
>     https://issues.apache.org/jira/browse/ATLAS-2634
> 
> 
> Repository: atlas
> 
> 
> Description
> -------
> 
> **Approach**
> Simplified approach.
> 
> **Algorithm**
>  - Modified: _NotificationHookConsumer_:
>    - when commit() fails, record current offset of the message.
>    - on receiving a message
>       - if _offset_ is same as that of the received message, skip processing 
> the message, go ahead and commit.
>       - reset recorded offset.
> Added appropriate log messages.
> 
> **Additional Refactoring**
> None.
> 
> 
> Diffs
> -----
> 
>   notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 
> 80dc51423 
>   
> webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
>  7a4596a7e 
>   
> webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
>  d2b3dfdab 
> 
> 
> Diff: https://reviews.apache.org/r/66899/diff/3/
> 
> 
> Testing
> -------
> 
> **Unit tests**
> Additional unit tests to test specific scenarios.
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>

Reply via email to