-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/#review184093
-----------------------------------------------------------




webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 227 (patched)
<https://reviews.apache.org/r/61665/#comment260095>

    Consider marking 'resetInterval' as final.



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 257 (patched)
<https://reviews.apache.org/r/61665/#comment260099>

    Consider renaming updateDurations() to setWaitDuration() and call from line 
#240.
    
    void setWaitDuration() {
      long now               = System.currentTimeMillis();
      long timeSinceLastWait = now - lastWaitAt;
    
      if (timeSinceLastWait > resetInterval) {
        waitDuration = minDuration; // reset
      } else if (waitDuration != maxDuration) {
        waitDuration += increment;
    
        if (waitDuration > maxDuration) {
          waitDuration = maxDuration;
        }
      }
      
      lastWaitAt = now;
    }



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 264 (patched)
<https://reviews.apache.org/r/61665/#comment260094>

    Shouldn't waitDuration be set to 'maxDuration' here? Else, the next wait 
will start from minDuration - causing more frequent retries.



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 275 (patched)
<https://reviews.apache.org/r/61665/#comment260103>

    Consider reading minWaitDuration, maxWaitDuration from configuration, right 
next to where consumerRetryInterval is read currently (line #120).
    
    consumerRetryInterval = 
applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
    minWaitDuration       = applicationProperties.getInt(MIN_RETRY_INTERVAL, 
consumerRetryInterval); // 500 ms  by default
    maxWaitDuration       = applicationProperties.getInt(MAX_RETRY_INTERVAL, 
minWaitDuration * 60);  //  30 sec by default



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 279 (patched)
<https://reviews.apache.org/r/61665/#comment260104>

    'pauseDuration' seems to be used only in tests; considering removing this 
member and update tests to use minWaitDuration.



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Line 246 (original), 306 (patched)
<https://reviews.apache.org/r/61665/#comment260105>

    Why not call 'adaptiveWaiter.pause()' here as well?


- Madhan Neethiraj


On Aug. 29, 2017, 9:01 p.m., Ashutosh Mestry wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/61665/
> -----------------------------------------------------------
> 
> (Updated Aug. 29, 2017, 9:01 p.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.
> 
> 
> Bugs: ATLAS-2047
>     https://issues.apache.org/jira/browse/ATLAS-2047
> 
> 
> Repository: atlas
> 
> 
> Description
> -------
> 
> Please refer to 
> [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background 
> and analysis.
> 
> **Background**
> 
> The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method 
> is called at the beginning of almost every method in this class. The method 
> checks if the consumer is closed, if it is then it throws 
> IllegalStateException.
> 
> Scenario may come about in this way:
> - Shutdown has been initiated. Close on consumer is called.
> - However, the consumer thread is just about to enter another poll cycle.
> - Thus acquire sees that consumer is closed and throws the exception (2nd 
> bullet above).
> 
> Please take a look at this scala code. This is _ShutdownableThread_. The 
> thread does the job of handling all exceptions. Upon exception, it manages 
> the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the 
> _isRunning_ loop.
> ```scala
>   override def run(): Unit = {
>     info("Starting ")
>     try{
>       while(isRunning.get()){
>         doWork()
>       }
>     } catch{
>       case e: Throwable =>
>         if(isRunning.get())
>           error("Error due to ", e)
>     }
>     shutdownLatch.countDown()
>     info("Stopped ")
>   }
> ```
> 
> **Implementation**
> 
> Special treatment is given to _IllegalStateException_ by implementing pause & 
> retry logic:
> - Modified _LOG_ to _debug_. That way logs are not filled during retry.
> - _HookConsumer_ is more resilient. It handles exceptions resulting from 
> _Kafka_ and entity APIs.
> 
> 
> Diffs
> -----
> 
>   
> webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
>  ef64c3b 
>   webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java 
> PRE-CREATION 
>   
> webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
>  a6f58e8 
> 
> 
> Diff: https://reviews.apache.org/r/61665/diff/3/
> 
> 
> Testing
> -------
> 
> **Unit tests**
> Updated unit tests to reproduce the scenarios and verify the fix.
> 
> **Functional tests**
> Verified regular notification scenarios.
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>

Reply via email to