Repository: atlas Updated Branches: refs/heads/branch-0.8 37f59dc95 -> 2f7348988
ATLAS-2634: Avoid duplicate message processing. Signed-off-by: Ashutosh Mestry <ames...@hortonworks.com> (cherry picked from commit f29a2b7bb2b555e68d7f5e2b43221f85877aa39c) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/2f734898 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/2f734898 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/2f734898 Branch: refs/heads/branch-0.8 Commit: 2f7348988b992e8a9e5a71cf1a483803fa7d6db8 Parents: 37f59dc Author: Ashutosh Mestry <ames...@hortonworks.com> Authored: Thu May 3 16:22:10 2018 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Tue Sep 18 10:58:21 2018 -0700 ---------------------------------------------------------------------- .../apache/atlas/kafka/KafkaNotification.java | 3 +- .../notification/NotificationHookConsumer.java | 42 +++++++++++- .../NotificationHookConsumerKafkaTest.java | 67 ++++++++++++++++++++ 3 files changed, 108 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/2f734898/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 4c63027..4c753d2 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -245,8 +245,9 @@ public class KafkaNotification extends AbstractNotification implements Service { } + @VisibleForTesting // Get properties for consumer request - private Properties getConsumerProperties(NotificationType type) { + public Properties getConsumerProperties(NotificationType type) { // find the configured group id for the given notification type String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY); http://git-wip-us.apache.org/repos/asf/atlas/blob/2f734898/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 88a8cce..1a567af 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -286,10 +286,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final List<String> failedMessages = new ArrayList<>(); private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration); + @VisibleForTesting + final FailedCommitOffsetRecorder failedCommitOffsetRecorder; public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) { super("atlas-hook-consumer-thread", false); this.consumer = consumer; + failedCommitOffsetRecorder = new FailedCommitOffsetRecorder(); } @Override @@ -342,6 +345,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } try { + if(failedCommitOffsetRecorder.isMessageReplayed(kafkaMsg.getOffset())) { + commit(kafkaMsg); + return; + } + // Used for intermediate conversions during create and update for (int numRetries = 0; numRetries < maxRetries; numRetries++) { if (LOG.isDebugEnabled()) { @@ -538,9 +546,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } private void commit(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) { - recordFailedMessages(); - TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition()); - consumer.commit(partition, kafkaMessage.getOffset() + 1); + boolean commitSucceessStatus = false; + try { + recordFailedMessages(); + + TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition()); + + consumer.commit(partition, kafkaMessage.getOffset() + 1); + commitSucceessStatus = true; + } finally { + failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset()); + } } boolean serverAvailable(Timer timer) { @@ -595,4 +611,24 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST, DateTimeHelper.formatDateUTC(new Date())); } + + static class FailedCommitOffsetRecorder { + private Long currentOffset; + + public void recordIfFailed(boolean commitStatus, long offset) { + if(commitStatus) { + currentOffset = null; + } else { + currentOffset = offset; + } + } + + public boolean isMessageReplayed(long offset) { + return currentOffset != null && currentOffset == offset; + } + + public Long getCurrentOffset() { + return currentOffset; + } + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/2f734898/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index e7a400e..14ecc2d 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -35,6 +35,9 @@ import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.web.service.ServiceState; import org.apache.commons.lang.RandomStringUtils; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.Assert; @@ -43,6 +46,7 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; import java.util.List; +import java.util.Properties; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; @@ -112,6 +116,38 @@ public class NotificationHookConsumerKafkaTest { reset(atlasEntityStore); } + @Test + public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException { + + ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); + NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder; + + produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity())); + + try { + produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); + consumeOneMessage(consumer, hookConsumer); + consumeOneMessage(consumer, hookConsumer); + } + catch(KafkaException ex) { + assertTrue(true, "ExceptionThrowing consumer throws an excepion."); + } + + assertTrue(failedCommitOffsetRecorder.getCurrentOffset() > -1); + + consumer.disableCommitExpcetion(); + + produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); + consumeOneMessage(consumer, hookConsumer); + consumeOneMessage(consumer, hookConsumer); + + assertNull(failedCommitOffsetRecorder.getCurrentOffset()); + + reset(atlasEntityStore); + } + @Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled") public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception { produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity())); @@ -138,6 +174,12 @@ public class NotificationHookConsumerKafkaTest { return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0); } + ExceptionThrowingCommitConsumer createNewConsumerThatThrowsExceptionInCommit(KafkaNotification kafkaNotification, boolean autoCommitEnabled) { + Properties prop = kafkaNotification.getConsumerProperties(NotificationInterface.NotificationType.HOOK); + KafkaConsumer consumer = kafkaNotification.getKafkaConsumer(prop, NotificationInterface.NotificationType.HOOK, true); + return new ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType.HOOK, consumer, autoCommitEnabled, 1000); + } + void consumeOneMessage(NotificationConsumer<HookNotificationMessage> consumer, NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException { try { @@ -197,4 +239,29 @@ public class NotificationHookConsumerKafkaTest { private void produceMessage(HookNotificationMessage message) throws NotificationException { kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message); } + + private static class ExceptionThrowingCommitConsumer extends AtlasKafkaConsumer { + + private boolean exceptionThrowingEnabled; + + public ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType notificationType, + KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) { + super(notificationType.getDeserializer(), kafkaConsumer, autoCommitEnabled, pollTimeoutMilliSeconds); + exceptionThrowingEnabled = true; + } + + @Override + public void commit(TopicPartition partition, long offset) { + if(exceptionThrowingEnabled) { + throw new KafkaException("test case verifying exception"); + } + else { + super.commit(partition, offset); + } + } + + public void disableCommitExpcetion() { + exceptionThrowingEnabled = false; + } + } }