-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/#review183832
-----------------------------------------------------------




webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 274 (patched)
<https://reviews.apache.org/r/61665/#comment259905>

    I think it will be useful to abstract this functionality in a class of its 
own.
    
    class AdaptiveWaiter {
      public AdaptiveWaiter(long minWaitTime, long maxWaitTime, long 
waitIncrement) {
        resetInterval = maxWaitTime * 2;
      }
    
      public wait(String message) {
        long now               = System.currentTimeMillis();
        long timeSinceLastWait = now - lastWaitAt;
    
        if (timeSinceLastWait > resetInterval) { // it has been a long time 
since the last call
          waitTime = minWaitTime;
        } else {
          waitTime = timeSinceLastWait + waitIncrement;
    
          if (waitTime > maxWaitTime) {
            waitTime = maxWaitTime;
          }
        }
        
        lastWaitAt = now;
    
        LOG.debug(...);
    
        sleep(waitTime);
      }


- Madhan Neethiraj


On Aug. 17, 2017, 5:04 a.m., Ashutosh Mestry wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/61665/
> -----------------------------------------------------------
> 
> (Updated Aug. 17, 2017, 5:04 a.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.
> 
> 
> Bugs: ATLAS-2047
>     https://issues.apache.org/jira/browse/ATLAS-2047
> 
> 
> Repository: atlas
> 
> 
> Description
> -------
> 
> Please refer to 
> [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background 
> and analysis.
> 
> **Background**
> 
> The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method 
> is called at the beginning of almost every method in this class. The method 
> checks if the consumer is closed, if it is then it throws 
> IllegalStateException.
> 
> Scenario may come about in this way:
> - Shutdown has been initiated. Close on consumer is called.
> - However, the consumer thread is just about to enter another poll cycle.
> - Thus acquire sees that consumer is closed and throws the exception (2nd 
> bullet above).
> 
> Please take a look at this scala code. This is _ShutdownableThread_. The 
> thread does the job of handling all exceptions. Upon exception, it manages 
> the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the 
> _isRunning_ loop.
> ```scala
>   override def run(): Unit = {
>     info("Starting ")
>     try{
>       while(isRunning.get()){
>         doWork()
>       }
>     } catch{
>       case e: Throwable =>
>         if(isRunning.get())
>           error("Error due to ", e)
>     }
>     shutdownLatch.countDown()
>     info("Stopped ")
>   }
> ```
> 
> **Implementation**
> 
> Special treatment is given to _IllegalStateException_ by implementing pause & 
> retry logic:
> - Modified _LOG_ to _debug_. That way logs are not filled during retry.
> - _HookConsumer_ is more resilient. It handles exceptions resulting from 
> _Kafka_ and entity APIs.
> 
> 
> Diffs
> -----
> 
>   
> webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
>  ef64c3b 
>   
> webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
>  a6f58e8 
> 
> 
> Diff: https://reviews.apache.org/r/61665/diff/2/
> 
> 
> Testing
> -------
> 
> **Unit tests**
> Updated unit tests to reproduce the scenarios and verify the fix.
> 
> **Functional tests**
> Verified regular notification scenarios.
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>

Reply via email to