Repository: atlas Updated Branches: refs/heads/master 015b8bf38 -> f29a2b7bb
ATLAS-2634: Avoid duplicate message processing. Signed-off-by: Ashutosh Mestry <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/f29a2b7b Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/f29a2b7b Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/f29a2b7b Branch: refs/heads/master Commit: f29a2b7bb2b555e68d7f5e2b43221f85877aa39c Parents: 015b8bf Author: Ashutosh Mestry <[email protected]> Authored: Thu May 3 16:22:10 2018 -0700 Committer: Ashutosh Mestry <[email protected]> Committed: Fri May 4 15:54:17 2018 -0700 ---------------------------------------------------------------------- .../apache/atlas/kafka/KafkaNotification.java | 6 +- .../notification/NotificationHookConsumer.java | 42 ++++++++++++- .../NotificationHookConsumerKafkaTest.java | 66 ++++++++++++++++++++ 3 files changed, 109 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/f29a2b7b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 80dc514..00e56e3 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -241,8 +241,10 @@ public class KafkaNotification extends AbstractNotification implements Service { } - // Get properties for consumer request - private Properties getConsumerProperties(NotificationType type) { + @VisibleForTesting + public + // Get properties for consumer request + Properties getConsumerProperties(NotificationType type) { // find the configured group id for the given notification type String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY); http://git-wip-us.apache.org/repos/asf/atlas/blob/f29a2b7b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 7a4596a..f5e555d 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -55,6 +55,7 @@ import org.apache.atlas.web.filters.AuditFilter; import org.apache.atlas.web.filters.AuditFilter.AuditLog; import org.apache.atlas.web.service.ServiceState; import org.apache.commons.configuration.Configuration; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -297,10 +298,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final List<HookNotification> failedMessages = new ArrayList<>(); private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration); + @VisibleForTesting + final FailedCommitOffsetRecorder failedCommitOffsetRecorder; + public HookConsumer(NotificationConsumer<HookNotification> consumer) { super("atlas-hook-consumer-thread", false); this.consumer = consumer; + failedCommitOffsetRecorder = new FailedCommitOffsetRecorder(); } @Override @@ -358,6 +363,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } try { + if(failedCommitOffsetRecorder.isMessageReplayed(kafkaMsg.getOffset())) { + commit(kafkaMsg); + return; + } + // Used for intermediate conversions during create and update for (int numRetries = 0; numRetries < maxRetries; numRetries++) { if (LOG.isDebugEnabled()) { @@ -558,11 +568,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } private void commit(AtlasKafkaMessage<HookNotification> kafkaMessage) { - recordFailedMessages(); + boolean commitSucceessStatus = false; + try { + recordFailedMessages(); - TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition()); + TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition()); - consumer.commit(partition, kafkaMessage.getOffset() + 1); + consumer.commit(partition, kafkaMessage.getOffset() + 1); + commitSucceessStatus = true; + } finally { + failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset()); + } } boolean serverAvailable(Timer timer) { @@ -612,4 +628,24 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl LOG.info("<== HookConsumer shutdown()"); } } + + static class FailedCommitOffsetRecorder { + private Long currentOffset; + + public void recordIfFailed(boolean commitStatus, long offset) { + if(commitStatus) { + currentOffset = null; + } else { + currentOffset = offset; + } + } + + public boolean isMessageReplayed(long offset) { + return currentOffset != null && currentOffset == offset; + } + + public Long getCurrentOffset() { + return currentOffset; + } + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/f29a2b7b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index d2b3dfd..dbe1a07 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -34,6 +34,9 @@ import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.web.service.ServiceState; import org.apache.commons.lang.RandomStringUtils; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.Assert; @@ -42,6 +45,7 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; import java.util.List; +import java.util.Properties; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; @@ -115,6 +119,38 @@ public class NotificationHookConsumerKafkaTest { reset(atlasEntityStore); } + @Test + public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException { + + ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); + NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder; + + produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity())); + + try { + produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity())); + consumeOneMessage(consumer, hookConsumer); + consumeOneMessage(consumer, hookConsumer); + } + catch(KafkaException ex) { + assertTrue(true, "ExceptionThrowing consumer throws an excepion."); + } + + assertTrue(failedCommitOffsetRecorder.getCurrentOffset() > -1); + + consumer.disableCommitExpcetion(); + + produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity())); + consumeOneMessage(consumer, hookConsumer); + consumeOneMessage(consumer, hookConsumer); + + assertNull(failedCommitOffsetRecorder.getCurrentOffset()); + + reset(atlasEntityStore); + } + @Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled") public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception { produceMessage(new HookNotificationV1.EntityCreateRequest("test_user3", createEntity())); @@ -140,6 +176,12 @@ public class NotificationHookConsumerKafkaTest { return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0); } + ExceptionThrowingCommitConsumer createNewConsumerThatThrowsExceptionInCommit(KafkaNotification kafkaNotification, boolean autoCommitEnabled) { + Properties prop = kafkaNotification.getConsumerProperties(NotificationInterface.NotificationType.HOOK); + KafkaConsumer consumer = kafkaNotification.getKafkaConsumer(prop, NotificationInterface.NotificationType.HOOK, true); + return new ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType.HOOK, consumer, autoCommitEnabled, 1000); + } + void consumeOneMessage(NotificationConsumer<HookNotification> consumer, NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException { try { @@ -205,4 +247,28 @@ public class NotificationHookConsumerKafkaTest { } } + private static class ExceptionThrowingCommitConsumer extends AtlasKafkaConsumer { + + private boolean exceptionThrowingEnabled; + + public ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType notificationType, + KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) { + super(notificationType, kafkaConsumer, autoCommitEnabled, pollTimeoutMilliSeconds); + exceptionThrowingEnabled = true; + } + + @Override + public void commit(TopicPartition partition, long offset) { + if(exceptionThrowingEnabled) { + throw new KafkaException("test case verifying exception"); + } + else { + super.commit(partition, offset); + } + } + + public void disableCommitExpcetion() { + exceptionThrowingEnabled = false; + } + } }
