Repository: incubator-atlas Updated Branches: refs/heads/master ec94d2ad1 -> f408e93ee
ATLAS-1129 Remove notification failed logs on retry and add sleep between retries (svimal2106 via sumasai) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/f408e93e Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/f408e93e Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/f408e93e Branch: refs/heads/master Commit: f408e93eebfd870f7eef547438c41260f729fdd9 Parents: ec94d2a Author: Suma Shivaprasad <[email protected]> Authored: Fri Aug 19 11:02:39 2016 -0700 Committer: Suma Shivaprasad <[email protected]> Committed: Fri Aug 19 11:02:39 2016 -0700 ---------------------------------------------------------------------- distro/src/conf/atlas-application.properties | 3 +++ .../src/main/java/org/apache/atlas/hook/AtlasHook.java | 12 +++++++++++- .../java/org/apache/atlas/kafka/KafkaNotification.java | 1 - release-log.txt | 1 + .../atlas/notification/NotificationHookConsumer.java | 12 +++++++++++- 5 files changed, 26 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f408e93e/distro/src/conf/atlas-application.properties ---------------------------------------------------------------------- diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties index 1b2cc81..d334600 100755 --- a/distro/src/conf/atlas-application.properties +++ b/distro/src/conf/atlas-application.properties @@ -65,6 +65,9 @@ atlas.kafka.auto.commit.enable=false atlas.notification.create.topics=true atlas.notification.replicas=1 atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES +atlas.notification.log.failed.messages=true +atlas.notification.consumer.retry.interval=500 +atlas.notification.hook.retry.interval=1000 # Enable for Kerberized Kafka clusters #atlas.notification.kafka.service.principal=kafka/[email protected] #atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f408e93e/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java index 09b1c4b..93a10b4 100644 --- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -54,6 +54,8 @@ public abstract class AtlasHook { private static boolean logFailedMessages; private static FailedMessagesLogger failedMessagesLogger; + private static int notificationRetryInterval; + public static final String ATLAS_NOTIFICATION_RETRY_INTERVAL = "atlas.notification.hook.retry.interval"; public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY = "atlas.notification.failed.messages.filename"; @@ -76,6 +78,7 @@ public abstract class AtlasHook { failedMessagesLogger.init(); } + notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000); Injector injector = Guice.createInjector(new NotificationModule()); notifInterface = injector.getInstance(NotificationInterface.class); @@ -128,7 +131,14 @@ public abstract class AtlasHook { } catch (Exception e) { numRetries++; if (numRetries < maxRetries) { - LOG.info("Failed to notify atlas for entity {}. Retrying", message, e); + LOG.error("Notification send retry failed"); + try { + LOG.info("Sleeping for {} ms before retry", notificationRetryInterval); + Thread.sleep(notificationRetryInterval); + } catch (InterruptedException ie){ + LOG.error("Notification hook thread sleep interrupted"); + } + } else { if (shouldLogFailedMessages && e instanceof NotificationException) { List<String> failedMessages = ((NotificationException) e).getFailedMessages(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f408e93e/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 806f2b4..2309ede 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -241,7 +241,6 @@ public class KafkaNotification extends AbstractNotification implements Service { LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(), response.partition(), response.offset()); } catch (Exception e) { - LOG.warn("Could not send message - {}", context.getMessage(), e); lastFailureException = e; failedMessages.add(context.getMessage()); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f408e93e/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index f5e3441..98e61e8 100644 --- a/release-log.txt +++ b/release-log.txt @@ -10,6 +10,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai) ALL CHANGES: +ATLAS-1129 Remove notification failed logs on retry and add sleep between retries (svimal2106 via sumasai) ATLAS-1126 Fix NPE in getSchema calls (sumasai) ATLAS-1125 Enable compression on hbase audit table (shwethags via sumasai) ATLAS-1121 NPE while submitting topology in StormHook (ayubkhan via sumasai) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f408e93e/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 6b1f3f2..a4fd1c2 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -54,11 +54,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries"; public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize"; + public static final String CONSUMER_RETRY_INTERVAL="atlas.notification.consumer.retry.interval"; public static final int SERVER_READY_WAIT_TIME_MS = 1000; private final LocalAtlasClient atlasClient; private final int maxRetries; private final int failedMsgCacheSize; + private final int consumerRetryInterval; private NotificationInterface notificationInterface; private ExecutorService executors; @@ -74,6 +76,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20); + consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500); } @@ -246,7 +249,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl break; } catch (Throwable e) { - LOG.warn("Error handling message", e); + LOG.warn("Error handling message" + e.getMessage()); + try{ + LOG.info("Sleeping for {} ms before retry", consumerRetryInterval); + Thread.sleep(consumerRetryInterval); + }catch (InterruptedException ie){ + LOG.error("Notification consumer thread sleep interrupted"); + } + if (numRetries == (maxRetries - 1)) { LOG.warn("Max retries exceeded for message {}", message, e); failedMessages.add(message);
