Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 dc5ebabe1 -> 37f59dc95


ATLAS-2877: updated notification processing to wait only before retry

(cherry picked from commit 48e522497f960ac9cc54853c3cb0e9af3e0111f3)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/37f59dc9
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/37f59dc9
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/37f59dc9

Branch: refs/heads/branch-0.8
Commit: 37f59dc954d5e2575d2068d80ffa5057e07266e0
Parents: dc5ebab
Author: Madhan Neethiraj <[email protected]>
Authored: Tue Sep 18 09:01:25 2018 -0700
Committer: Madhan Neethiraj <[email protected]>
Committed: Tue Sep 18 10:39:14 2018 -0700

----------------------------------------------------------------------
 .../notification/NotificationHookConsumer.java  | 38 +++++++++++---------
 1 file changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/37f59dc9/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 475be9a..88a8cce 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -282,10 +282,10 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     @VisibleForTesting
     class HookConsumer extends ShutdownableThread {
         private final NotificationConsumer<HookNotificationMessage> consumer;
-        private final AtomicBoolean shouldRun = new AtomicBoolean(false);
-        private List<HookNotificationMessage> failedMessages = new 
ArrayList<>();
+        private final AtomicBoolean                                 shouldRun  
    = new AtomicBoolean(false);
+        private final List<String>                                  
failedMessages = new ArrayList<>();
+        private final AdaptiveWaiter                                
adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, 
minWaitDuration);
 
-        private final AdaptiveWaiter adaptiveWaiter = new 
AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
 
         public HookConsumer(NotificationConsumer<HookNotificationMessage> 
consumer) {
             super("atlas-hook-consumer-thread", false);
@@ -496,21 +496,27 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                     } catch (Throwable e) {
                         RequestContextV1.get().resetEntityGuidUpdates();
 
-                        LOG.warn("Error handling message", e);
-                        try {
-                            LOG.info("Sleeping for {} ms before retry", 
consumerRetryInterval);
-                            Thread.sleep(consumerRetryInterval);
-                        } catch (InterruptedException ie) {
-                            LOG.error("Notification consumer thread sleep 
interrupted");
-                        }
-
                         if (numRetries == (maxRetries - 1)) {
-                            LOG.warn("Max retries exceeded for message {}", 
message, e);
-                            failedMessages.add(message);
+                            String strMessage = 
AbstractNotification.getMessageJson(message);
+
+                            LOG.warn("Max retries exceeded for message {}", 
strMessage, e);
+
+                            failedMessages.add(strMessage);
+
                             if (failedMessages.size() >= failedMsgCacheSize) {
                                 recordFailedMessages();
                             }
                             return;
+                        } else {
+                            LOG.warn("Error handling message", e);
+
+                            try {
+                                LOG.info("Sleeping for {} ms before retry", 
consumerRetryInterval);
+
+                                Thread.sleep(consumerRetryInterval);
+                            } catch (InterruptedException ie) {
+                                LOG.error("Notification consumer thread sleep 
interrupted");
+                            }
                         }
                     } finally {
                         RequestContext.clear();
@@ -525,8 +531,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
         private void recordFailedMessages() {
             //logging failed messages
-            for (HookNotificationMessage message : failedMessages) {
-                FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", 
AbstractNotification.getMessageJson(message));
+            for (String message : failedMessages) {
+                FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", message);
             }
             failedMessages.clear();
         }
@@ -589,4 +595,4 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, 
path, LOCALHOST,
                 DateTimeHelper.formatDateUTC(new Date()));
     }
-}
\ No newline at end of file
+}

Reply via email to