Repository: atlas Updated Branches: refs/heads/master 3176d1a1e -> 48e522497
ATLAS-2877: updated notification processing to wait only before retry Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/48e52249 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/48e52249 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/48e52249 Branch: refs/heads/master Commit: 48e522497f960ac9cc54853c3cb0e9af3e0111f3 Parents: 3176d1a Author: Madhan Neethiraj <[email protected]> Authored: Tue Sep 18 09:01:25 2018 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Tue Sep 18 09:10:59 2018 -0700 ---------------------------------------------------------------------- .../notification/NotificationHookConsumer.java | 31 +++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/48e52249/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index d680e4e..9f832b9 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -294,7 +294,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl class HookConsumer extends ShutdownableThread { private final NotificationConsumer<HookNotification> consumer; private final AtomicBoolean shouldRun = new AtomicBoolean(false); - private final List<HookNotification> failedMessages = new ArrayList<>(); + private final List<String> failedMessages = new ArrayList<>(); private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration); @VisibleForTesting @@ -523,26 +523,29 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } catch (Throwable e) { RequestContext.get().resetEntityGuidUpdates(); - LOG.warn("Error handling message", e); - try { - LOG.info("Sleeping for {} ms before retry", consumerRetryInterval); - - Thread.sleep(consumerRetryInterval); - } catch (InterruptedException ie) { - LOG.error("Notification consumer thread sleep interrupted"); - } - if (numRetries == (maxRetries - 1)) { - LOG.warn("Max retries exceeded for message {}", message, e); + String strMessage = AbstractNotification.getMessageJson(message); + + LOG.warn("Max retries exceeded for message {}", strMessage, e); isFailedMsg = true; - failedMessages.add(message); + failedMessages.add(strMessage); if (failedMessages.size() >= failedMsgCacheSize) { recordFailedMessages(); } return; + } else { + LOG.warn("Error handling message", e); + + try { + LOG.info("Sleeping for {} ms before retry", consumerRetryInterval); + + Thread.sleep(consumerRetryInterval); + } catch (InterruptedException ie) { + LOG.error("Notification consumer thread sleep interrupted"); + } } } finally { RequestContext.clear(); @@ -564,8 +567,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private void recordFailedMessages() { //logging failed messages - for (HookNotification message : failedMessages) { - FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", AbstractNotification.getMessageJson(message)); + for (String message : failedMessages) { + FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", message); } failedMessages.clear();
