Repository: atlas Updated Branches: refs/heads/branch-0.8 dc5ebabe1 -> 37f59dc95
ATLAS-2877: updated notification processing to wait only before retry (cherry picked from commit 48e522497f960ac9cc54853c3cb0e9af3e0111f3) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/37f59dc9 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/37f59dc9 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/37f59dc9 Branch: refs/heads/branch-0.8 Commit: 37f59dc954d5e2575d2068d80ffa5057e07266e0 Parents: dc5ebab Author: Madhan Neethiraj <[email protected]> Authored: Tue Sep 18 09:01:25 2018 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Tue Sep 18 10:39:14 2018 -0700 ---------------------------------------------------------------------- .../notification/NotificationHookConsumer.java | 38 +++++++++++--------- 1 file changed, 22 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/37f59dc9/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 475be9a..88a8cce 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -282,10 +282,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @VisibleForTesting class HookConsumer extends ShutdownableThread { private final NotificationConsumer<HookNotificationMessage> consumer; - private final AtomicBoolean shouldRun = new AtomicBoolean(false); - private List<HookNotificationMessage> failedMessages = new ArrayList<>(); + private final AtomicBoolean shouldRun = new AtomicBoolean(false); + private final List<String> failedMessages = new ArrayList<>(); + private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration); - private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration); public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) { super("atlas-hook-consumer-thread", false); @@ -496,21 +496,27 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } catch (Throwable e) { RequestContextV1.get().resetEntityGuidUpdates(); - LOG.warn("Error handling message", e); - try { - LOG.info("Sleeping for {} ms before retry", consumerRetryInterval); - Thread.sleep(consumerRetryInterval); - } catch (InterruptedException ie) { - LOG.error("Notification consumer thread sleep interrupted"); - } - if (numRetries == (maxRetries - 1)) { - LOG.warn("Max retries exceeded for message {}", message, e); - failedMessages.add(message); + String strMessage = AbstractNotification.getMessageJson(message); + + LOG.warn("Max retries exceeded for message {}", strMessage, e); + + failedMessages.add(strMessage); + if (failedMessages.size() >= failedMsgCacheSize) { recordFailedMessages(); } return; + } else { + LOG.warn("Error handling message", e); + + try { + LOG.info("Sleeping for {} ms before retry", consumerRetryInterval); + + Thread.sleep(consumerRetryInterval); + } catch (InterruptedException ie) { + LOG.error("Notification consumer thread sleep interrupted"); + } } } finally { RequestContext.clear(); @@ -525,8 +531,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private void recordFailedMessages() { //logging failed messages - for (HookNotificationMessage message : failedMessages) { - FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", AbstractNotification.getMessageJson(message)); + for (String message : failedMessages) { + FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", message); } failedMessages.clear(); } @@ -589,4 +595,4 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST, DateTimeHelper.formatDateUTC(new Date())); } -} \ No newline at end of file +}
