Repository: incubator-atlas
Updated Branches:
  refs/heads/master 07b8b4d3c -> 98769871e


ATLAS-629 Kafka messages in ATLAS_HOOK might be lost in HA mode at the instant 
of failover. (yhemanth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/98769871
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/98769871
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/98769871

Branch: refs/heads/master
Commit: 98769871e56d9a97792e2dba52345e876908ac63
Parents: 07b8b4d
Author: Hemanth Yamijala <[email protected]>
Authored: Fri May 13 09:48:20 2016 +0530
Committer: Hemanth Yamijala <[email protected]>
Committed: Fri May 13 09:48:20 2016 +0530

----------------------------------------------------------------------
 distro/src/conf/atlas-application.properties    |   1 +
 .../org/apache/atlas/kafka/KafkaConsumer.java   |  27 +++-
 .../apache/atlas/kafka/KafkaNotification.java   |  55 +++++---
 .../notification/AbstractNotification.java      |   9 +-
 .../AbstractNotificationConsumer.java           |   2 +
 .../notification/NotificationConsumer.java      |   9 ++
 .../apache/atlas/kafka/KafkaConsumerTest.java   |  55 +++++++-
 .../atlas/kafka/KafkaNotificationTest.java      | 119 ++++++----------
 .../AbstractNotificationConsumerTest.java       |   5 +
 release-log.txt                                 |   1 +
 .../main/resources/atlas-application.properties |   1 +
 .../notification/NotificationHookConsumer.java  |  82 ++++++-----
 .../NotificationHookConsumerKafkaTest.java      | 141 +++++++++++++++++++
 .../NotificationHookConsumerTest.java           |  36 +++++
 14 files changed, 397 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/distro/src/conf/atlas-application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-application.properties 
b/distro/src/conf/atlas-application.properties
index 119865d..b2b62aa 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -59,6 +59,7 @@ atlas.kafka.zookeeper.sync.time.ms=20
 atlas.kafka.auto.commit.interval.ms=1000
 atlas.kafka.auto.offset.reset=smallest
 atlas.kafka.hook.group.id=atlas
+atlas.kafka.auto.commit.enable=false
 
 
 #########  Hive Lineage Configs  #########

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java 
b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
index f1c9742..270215d 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
@@ -19,6 +19,7 @@ package org.apache.atlas.kafka;
 
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
 import org.apache.atlas.notification.AbstractNotificationConsumer;
 import org.apache.atlas.notification.MessageDeserializer;
@@ -35,24 +36,29 @@ public class KafkaConsumer<T> extends 
AbstractNotificationConsumer<T> {
 
     private final int consumerId;
     private final ConsumerIterator iterator;
+    private final ConsumerConnector consumerConnector;
+    private final boolean autoCommitEnabled;
+    private long lastSeenOffset;
 
 
     // ----- Constructors ----------------------------------------------------
 
     /**
      * Create a Kafka consumer.
-     *
-     * @param type          the notification type returned by this consumer
      * @param deserializer  the message deserializer used for this consumer
      * @param stream        the underlying Kafka stream
      * @param consumerId    an id value for this consumer
+     * @param consumerConnector the {@link ConsumerConnector} which created 
the underlying Kafka stream
+     * @param autoCommitEnabled true if consumer does not need to commit 
offsets explicitly, false otherwise.
      */
-    public KafkaConsumer(Class<T> type,
-                         MessageDeserializer<T> deserializer, 
KafkaStream<String, String> stream, int consumerId) {
+    public KafkaConsumer(MessageDeserializer<T> deserializer, 
KafkaStream<String, String> stream, int consumerId,
+                         ConsumerConnector consumerConnector, boolean 
autoCommitEnabled) {
         super(deserializer);
-
+        this.consumerConnector = consumerConnector;
+        this.lastSeenOffset = 0;
         this.iterator   = stream.iterator();
         this.consumerId = consumerId;
+        this.autoCommitEnabled = autoCommitEnabled;
     }
 
 
@@ -71,6 +77,7 @@ public class KafkaConsumer<T> extends 
AbstractNotificationConsumer<T> {
         MessageAndMetadata message = iterator.next();
         LOG.debug("Read message: conumerId: {}, topic - {}, partition - {}, 
offset - {}, message - {}",
                 consumerId, message.topic(), message.partition(), 
message.offset(), message.message());
+        lastSeenOffset = message.offset();
         return (String) message.message();
     }
 
@@ -79,4 +86,14 @@ public class KafkaConsumer<T> extends 
AbstractNotificationConsumer<T> {
         MessageAndMetadata message = (MessageAndMetadata) iterator.peek();
         return (String) message.message();
     }
+
+    @Override
+    public void commit() {
+        if (autoCommitEnabled) {
+            LOG.debug("Auto commit is disabled, not committing.");
+        } else {
+            consumerConnector.commitOffsets();
+            LOG.debug("Committed offset: {}", lastSeenOffset);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index cfffec4..1ee62d2 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.kafka;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Singleton;
 import kafka.consumer.Consumer;
 import kafka.consumer.KafkaStream;
@@ -112,9 +113,6 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
                 "org.apache.kafka.common.serialization.StringSerializer");
         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                 "org.apache.kafka.common.serialization.StringSerializer");
-
-        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-
         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                 "org.apache.kafka.common.serialization.StringDeserializer");
         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
@@ -123,6 +121,10 @@ public class KafkaNotification extends 
AbstractNotification implements Service {
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
     }
 
+    @VisibleForTesting
+    protected KafkaNotification(Properties properties) {
+        this.properties = properties;
+    }
 
     // ----- Service ---------------------------------------------------------
 
@@ -159,26 +161,34 @@ public class KafkaNotification extends 
AbstractNotification implements Service {
     @Override
     public <T> List<NotificationConsumer<T>> createConsumers(NotificationType 
notificationType,
                                                              int numConsumers) 
{
+        return createConsumers(notificationType, numConsumers,
+                Boolean.valueOf(properties.getProperty("auto.commit.enable", 
"true")));
+    }
+
+    @VisibleForTesting
+    public <T> List<NotificationConsumer<T>> createConsumers(NotificationType 
notificationType,
+                                                      int numConsumers, 
boolean autoCommitEnabled) {
         String topic = TOPIC_MAP.get(notificationType);
 
         Properties consumerProperties = 
getConsumerProperties(notificationType);
 
-        ConsumerConnector consumerConnector = 
createConsumerConnector(consumerProperties);
-        Map<String, Integer> topicCountMap = new HashMap<>();
-        topicCountMap.put(topic, numConsumers);
-        StringDecoder decoder = new StringDecoder(null);
-        Map<String, List<KafkaStream<String, String>>> streamsMap =
-                consumerConnector.createMessageStreams(topicCountMap, decoder, 
decoder);
-        List<KafkaStream<String, String>> kafkaConsumers = 
streamsMap.get(topic);
         List<NotificationConsumer<T>> consumers = new 
ArrayList<>(numConsumers);
-        int consumerId = 0;
-        for (KafkaStream stream : kafkaConsumers) {
-            KafkaConsumer<T> kafkaConsumer =
-                createKafkaConsumer(notificationType.getClassType(), 
notificationType.getDeserializer(),
-                    stream, consumerId++);
-            consumers.add(kafkaConsumer);
+        for (int i = 0; i < numConsumers; i++) {
+            ConsumerConnector consumerConnector = 
createConsumerConnector(consumerProperties);
+            Map<String, Integer> topicCountMap = new HashMap<>();
+            topicCountMap.put(topic, 1);
+            StringDecoder decoder = new StringDecoder(null);
+            Map<String, List<KafkaStream<String, String>>> streamsMap =
+                    consumerConnector.createMessageStreams(topicCountMap, 
decoder, decoder);
+            List<KafkaStream<String, String>> kafkaConsumers = 
streamsMap.get(topic);
+            for (KafkaStream stream : kafkaConsumers) {
+                KafkaConsumer<T> kafkaConsumer =
+                        createKafkaConsumer(notificationType.getClassType(), 
notificationType.getDeserializer(),
+                                stream, i, consumerConnector, 
autoCommitEnabled);
+                consumers.add(kafkaConsumer);
+            }
+            consumerConnectors.add(consumerConnector);
         }
-        consumerConnectors.add(consumerConnector);
 
         return consumers;
     }
@@ -245,12 +255,14 @@ public class KafkaNotification extends 
AbstractNotification implements Service {
      * @param stream        the Kafka stream
      * @param consumerId    the id for the new consumer
      *
+     * @param consumerConnector
      * @return a new Kafka consumer
      */
-    protected <T> org.apache.atlas.kafka.KafkaConsumer<T> 
createKafkaConsumer(Class<T> type,
-            MessageDeserializer<T> deserializer, KafkaStream stream,
-            int consumerId) {
-        return new org.apache.atlas.kafka.KafkaConsumer<T>(type, deserializer, 
stream, consumerId);
+    protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
+    createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, 
KafkaStream stream,
+                        int consumerId, ConsumerConnector consumerConnector, 
boolean autoCommitEnabled) {
+        return new org.apache.atlas.kafka.KafkaConsumer<T>(deserializer, 
stream,
+                consumerId, consumerConnector, autoCommitEnabled);
     }
 
     // Get properties for consumer request
@@ -266,6 +278,7 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
         consumerProperties.putAll(properties);
         consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
 
+        LOG.info("Consumer property: auto.commit.enable: " + 
consumerProperties.getProperty("auto.commit.enable"));
         return consumerProperties;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
 
b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index 7d22126..cb44fc6 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.notification;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonElement;
@@ -46,7 +47,7 @@ public abstract class AbstractNotification implements 
NotificationInterface {
      */
     public static final MessageVersion CURRENT_MESSAGE_VERSION = new 
MessageVersion("1.0.0");
 
-    private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + 
".embedded";
+    public static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + 
".embedded";
     private final boolean embedded;
     private final boolean isHAEnabled;
 
@@ -59,7 +60,6 @@ public abstract class AbstractNotification implements 
NotificationInterface {
         registerTypeAdapter(JSONArray.class, new JSONArraySerializer()).
         create();
 
-
     // ----- Constructors ----------------------------------------------------
 
     public AbstractNotification(Configuration applicationProperties) throws 
AtlasException {
@@ -67,6 +67,11 @@ public abstract class AbstractNotification implements 
NotificationInterface {
         this.isHAEnabled = HAConfiguration.isHAEnabled(applicationProperties);
     }
 
+    @VisibleForTesting
+    protected AbstractNotification() {
+        embedded = false;
+        isHAEnabled = false;
+    }
 
     // ----- NotificationInterface -------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
 
b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
index f00bbca..d4d78de 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
@@ -68,4 +68,6 @@ public abstract class AbstractNotificationConsumer<T> 
implements NotificationCon
     public T peek() {
         return deserializer.deserialize(peekMessage());
     }
+
+    public abstract void commit();
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
 
b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 78e8ce7..2e861cb 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -43,4 +43,13 @@ public interface NotificationConsumer<T> {
      * @return the next notification
      */
     T peek();
+
+    /**
+     * Commit the offset of messages that have been successfully processed.
+     *
+     * This API should be called when messages read with {@link #next()} have 
been successfully processed and
+     * the consumer is ready to handle the next message, which could happen 
even after a normal or an abnormal
+     * restart.
+     */
+    void commit();
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 7f607c6..ad7d93e 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -20,6 +20,7 @@ package org.apache.atlas.kafka;
 
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
 import org.apache.atlas.notification.AbstractNotification;
 import org.apache.atlas.notification.MessageVersion;
@@ -33,6 +34,9 @@ import org.apache.atlas.typesystem.IStruct;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
 import org.codehaus.jettison.json.JSONException;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.util.Collections;
@@ -41,6 +45,8 @@ import java.util.List;
 import java.util.NoSuchElementException;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.*;
 
@@ -51,6 +57,14 @@ public class KafkaConsumerTest {
 
     private static final String TRAIT_NAME = "MyTrait";
 
+    @Mock
+    private ConsumerConnector consumerConnector;
+
+    @BeforeMethod
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+    }
+
     @Test
     public void testNext() throws Exception {
         KafkaStream<String, String> stream = mock(KafkaStream.class);
@@ -70,8 +84,9 @@ public class KafkaConsumerTest {
         when(messageAndMetadata.message()).thenReturn(json);
 
         NotificationConsumer<HookNotification.HookNotificationMessage> 
consumer =
-            new 
KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
-                NotificationInterface.NotificationType.HOOK.getDeserializer(), 
stream, 99);
+            new KafkaConsumer<>(
+                    
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
+                    consumerConnector, false);
 
         assertTrue(consumer.hasNext());
 
@@ -101,8 +116,9 @@ public class KafkaConsumerTest {
         when(messageAndMetadata.message()).thenReturn(json);
 
         NotificationConsumer<HookNotification.HookNotificationMessage> 
consumer =
-            new 
KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
-                NotificationInterface.NotificationType.HOOK.getDeserializer(), 
stream, 99);
+            new KafkaConsumer<>(
+                    
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
+                    consumerConnector, false);
 
         assertTrue(consumer.hasNext());
 
@@ -135,8 +151,9 @@ public class KafkaConsumerTest {
         when(messageAndMetadata.message()).thenReturn(json);
 
         NotificationConsumer<HookNotification.HookNotificationMessage> 
consumer =
-            new 
KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
-                NotificationInterface.NotificationType.HOOK.getDeserializer(), 
stream, 99);
+            new KafkaConsumer<>(
+                    
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
+                    consumerConnector, false);
 
         assertTrue(consumer.hasNext());
 
@@ -147,6 +164,32 @@ public class KafkaConsumerTest {
         assertTrue(consumer.hasNext());
     }
 
+    @Test
+    public void testCommitIsCalledIfAutoCommitDisabled() {
+        KafkaStream<String, String> stream = mock(KafkaStream.class);
+        NotificationConsumer<HookNotification.HookNotificationMessage> 
consumer =
+                new KafkaConsumer<>(
+                        
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
+                        consumerConnector, false);
+
+        consumer.commit();
+
+        verify(consumerConnector).commitOffsets();
+    }
+
+    @Test
+    public void testCommitIsNotCalledIfAutoCommitEnabled() {
+        KafkaStream<String, String> stream = mock(KafkaStream.class);
+        NotificationConsumer<HookNotification.HookNotificationMessage> 
consumer =
+                new KafkaConsumer<>(
+                        
NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99,
+                        consumerConnector, true);
+
+        consumer.commit();
+
+        verify(consumerConnector, never()).commitOffsets();
+    }
+
     private Referenceable getEntity(String traitName) {
         Referenceable entity = EntityNotificationImplTest.getEntity("id");
         List<IStruct> traitInfo = new LinkedList<>();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index 17fda25..219bd70 100644
--- 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -17,26 +17,16 @@
  */
 package org.apache.atlas.kafka;
 
-import com.google.inject.Inject;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.serializer.StringDecoder;
-import org.apache.atlas.AtlasException;
 import org.apache.atlas.notification.MessageDeserializer;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.NotificationModule;
-import org.apache.commons.configuration.Configuration;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -44,99 +34,80 @@ import java.util.Properties;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
-@Guice(modules = NotificationModule.class)
 public class KafkaNotificationTest {
 
-    @Inject
-    private KafkaNotification kafka;
-
-    @BeforeClass
-    public void setUp() throws Exception {
-        kafka.start();
-    }
-
     @Test
     @SuppressWarnings("unchecked")
     public void testCreateConsumers() throws Exception {
-        Configuration configuration = mock(Configuration.class);
-        Iterator iterator = mock(Iterator.class);
-        ConsumerConnector consumerConnector = mock(ConsumerConnector.class);
-        KafkaStream kafkaStream1 = mock(KafkaStream.class);
-        KafkaStream kafkaStream2 = mock(KafkaStream.class);
-        String groupId = "groupId9999";
-
-        
when(configuration.subset(KafkaNotification.PROPERTY_PREFIX)).thenReturn(configuration);
-        when(configuration.getKeys()).thenReturn(iterator);
-        when(iterator.hasNext()).thenReturn(true).thenReturn(false);
-        when(iterator.next()).thenReturn("entities." + 
KafkaNotification.CONSUMER_GROUP_ID_PROPERTY);
-        when(configuration.getList("entities." + 
KafkaNotification.CONSUMER_GROUP_ID_PROPERTY))
-                .thenReturn(Collections.<Object>singletonList(groupId));
-
-        Map<String, List<KafkaStream<String, String>>> streamsMap = new 
HashMap<>();
-        List<KafkaStream<String, String>> kafkaStreamList = new LinkedList<>();
-        kafkaStreamList.add(kafkaStream1);
-        kafkaStreamList.add(kafkaStream2);
-        streamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 
kafkaStreamList);
+        Properties properties = mock(Properties.class);
+        when(properties.getProperty("entities.group.id")).thenReturn("atlas");
+        final ConsumerConnector consumerConnector = 
mock(ConsumerConnector.class);
         Map<String, Integer> topicCountMap = new HashMap<>();
-        topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 2);
+        topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 1);
 
-        when(consumerConnector.createMessageStreams(
-            eq(topicCountMap), any(StringDecoder.class), 
any(StringDecoder.class))).thenReturn(streamsMap);
-
-        TestKafkaNotification kafkaNotification = new 
TestKafkaNotification(configuration, consumerConnector);
+        Map<String, List<KafkaStream<String, String>>> kafkaStreamsMap =
+                new HashMap<>();
+        List<KafkaStream<String, String>> kafkaStreams = new ArrayList<>();
+        KafkaStream kafkaStream = mock(KafkaStream.class);
+        kafkaStreams.add(kafkaStream);
+        kafkaStreamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 
kafkaStreams);
 
-        List<NotificationConsumer<String>> consumers =
-            
kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES,
 2);
+        when(consumerConnector.createMessageStreams(
+                eq(topicCountMap), any(StringDecoder.class), 
any(StringDecoder.class))).thenReturn(kafkaStreamsMap);
 
-        assertEquals(2, consumers.size());
+        final KafkaConsumer consumer1 = mock(KafkaConsumer.class);
+        final KafkaConsumer consumer2 = mock(KafkaConsumer.class);
 
-        // assert that all of the given kafka streams were used to create 
kafka consumers
-        List<KafkaStream> streams = kafkaNotification.kafkaStreams;
-        assertTrue(streams.contains(kafkaStream1));
-        assertTrue(streams.contains(kafkaStream2));
+        KafkaNotification kafkaNotification =
+                new TestKafkaNotification(properties, consumerConnector, 
consumer1, consumer2);
 
-        // assert that the given consumer group id was added to the properties 
used to create the consumer connector
-        Properties properties = kafkaNotification.myProperties;
-        assertEquals(groupId, 
properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
-    }
+        List<NotificationConsumer<String>> consumers =
+                
kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES,
 2);
 
-    @AfterClass
-    public void teardown() throws Exception {
-        kafka.stop();
+        verify(consumerConnector, times(2)).createMessageStreams(
+                eq(topicCountMap), any(StringDecoder.class), 
any(StringDecoder.class));
+        assertEquals(consumers.size(), 2);
+        assertTrue(consumers.contains(consumer1));
+        assertTrue(consumers.contains(consumer2));
     }
 
-    // Extended kafka notification class for testing.
-    private static class TestKafkaNotification extends KafkaNotification {
+    class TestKafkaNotification extends KafkaNotification {
 
         private final ConsumerConnector consumerConnector;
+        private final KafkaConsumer consumer1;
+        private final KafkaConsumer consumer2;
 
-        private Properties myProperties;
-        private List<KafkaStream> kafkaStreams = new LinkedList<>();
-
-        public TestKafkaNotification(Configuration applicationProperties,
-                                     ConsumerConnector consumerConnector) 
throws AtlasException {
-            super(applicationProperties);
+        TestKafkaNotification(Properties properties, ConsumerConnector 
consumerConnector,
+                              KafkaConsumer consumer1, KafkaConsumer 
consumer2) {
+            super(properties);
             this.consumerConnector = consumerConnector;
+            this.consumer1 = consumer1;
+            this.consumer2 = consumer2;
         }
 
         @Override
         protected ConsumerConnector createConsumerConnector(Properties 
consumerProperties) {
-            this.myProperties = consumerProperties;
-            kafkaStreams.clear();
             return consumerConnector;
         }
 
         @Override
-        protected <T> org.apache.atlas.kafka.KafkaConsumer<T> 
createKafkaConsumer(Class<T> type,
-                                                                               
   MessageDeserializer<T> deserializer,
-                                                                               
   KafkaStream stream,
-                                                                               
   int consumerId) {
-            kafkaStreams.add(stream);
-            return super.createKafkaConsumer(type, deserializer, stream, 
consumerId);
+        protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
+        createKafkaConsumer(Class<T> type, MessageDeserializer<T> 
deserializer, KafkaStream stream,
+                            int consumerId, ConsumerConnector connector, 
boolean autoCommitEnabled) {
+            if (consumerId == 0) {
+                return consumer1;
+            } else if (consumerId == 1) {
+                return consumer2;
+            }
+            return null;
         }
+
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
 
b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index e63175d..e8b55ef 100644
--- 
a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ 
b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -253,6 +253,11 @@ public class AbstractNotificationConsumerTest {
         public boolean hasNext() {
             return index < messageList.size();
         }
+
+        @Override
+        public void commit() {
+            // do nothing.
+        }
     }
 
     private static final class TestDeserializer<T> extends 
VersionedMessageDeserializer<T> {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index f68e86c..5d37d07 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -20,6 +20,7 @@ ATLAS-409 Atlas will not import avro tables with schema read 
from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons 
(venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-629 Kafka messages in ATLAS_HOOK might be lost in HA mode at the instant 
of failover. (yhemanth)
 ATLAS-758 hdfs location of hive table is pointing to old location even after 
rename ( sumasai )
 ATLAS-667 Entity delete should check for required reverse references ( dkantor 
via sumasai )
 ATLAS-738 Add query ability on system properties like guid, state, createdtime 
etc (shwethags)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/typesystem/src/main/resources/atlas-application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/atlas-application.properties 
b/typesystem/src/main/resources/atlas-application.properties
index a343a20..aafad0f 100644
--- a/typesystem/src/main/resources/atlas-application.properties
+++ b/typesystem/src/main/resources/atlas-application.properties
@@ -70,6 +70,7 @@ atlas.kafka.consumer.timeout.ms=100
 atlas.kafka.auto.commit.interval.ms=100
 atlas.kafka.hook.group.id=atlas
 atlas.kafka.entities.group.id=atlas_entities
+atlas.kafka.auto.commit.enable=false
 
 #########  Entity Audit Configs  #########
 atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 8ef2f64..901b1ed 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.notification;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -183,50 +184,55 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             while (shouldRun.get()) {
                 try {
                     if (hasNext()) {
-                        HookNotification.HookNotificationMessage message = 
consumer.next();
-                        atlasClient.setUser(message.getUser());
-                        try {
-                            switch (message.getType()) {
-                            case ENTITY_CREATE:
-                                HookNotification.EntityCreateRequest 
createRequest =
-                                        (HookNotification.EntityCreateRequest) 
message;
-                                
atlasClient.createEntity(createRequest.getEntities());
-                                break;
+                        handleMessage(consumer.next());
+                    }
+                } catch (Throwable t) {
+                    LOG.warn("Failure in NotificationHookConsumer", t);
+                }
+            }
+        }
 
-                            case ENTITY_PARTIAL_UPDATE:
-                                HookNotification.EntityPartialUpdateRequest 
partialUpdateRequest =
-                                        
(HookNotification.EntityPartialUpdateRequest) message;
-                                
atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
-                                        partialUpdateRequest.getAttribute(),
-                                        
partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity());
-                                break;
+        @VisibleForTesting
+        void handleMessage(HookNotification.HookNotificationMessage message) {
+            atlasClient.setUser(message.getUser());
+            try {
+                switch (message.getType()) {
+                case ENTITY_CREATE:
+                    HookNotification.EntityCreateRequest createRequest =
+                            (HookNotification.EntityCreateRequest) message;
+                    atlasClient.createEntity(createRequest.getEntities());
+                    break;
 
-                            case ENTITY_DELETE:
-                                HookNotification.EntityDeleteRequest 
deleteRequest =
-                                    (HookNotification.EntityDeleteRequest) 
message;
-                                
atlasClient.deleteEntity(deleteRequest.getTypeName(),
-                                    deleteRequest.getAttribute(),
-                                    deleteRequest.getAttributeValue());
-                                break;
+                case ENTITY_PARTIAL_UPDATE:
+                    HookNotification.EntityPartialUpdateRequest 
partialUpdateRequest =
+                            (HookNotification.EntityPartialUpdateRequest) 
message;
+                    
atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
+                            partialUpdateRequest.getAttribute(),
+                            partialUpdateRequest.getAttributeValue(), 
partialUpdateRequest.getEntity());
+                    break;
 
-                            case ENTITY_FULL_UPDATE:
-                                HookNotification.EntityUpdateRequest 
updateRequest =
-                                        (HookNotification.EntityUpdateRequest) 
message;
-                                
atlasClient.updateEntities(updateRequest.getEntities());
-                                break;
+                case ENTITY_DELETE:
+                    HookNotification.EntityDeleteRequest deleteRequest =
+                        (HookNotification.EntityDeleteRequest) message;
+                    atlasClient.deleteEntity(deleteRequest.getTypeName(),
+                        deleteRequest.getAttribute(),
+                        deleteRequest.getAttributeValue());
+                    break;
 
-                            default:
-                                throw new IllegalStateException("Unhandled 
exception!");
-                            }
-                        } catch (Exception e) {
-                            //todo handle failures
-                            LOG.warn("Error handling message {}", message, e);
-                        }
-                    }
-                } catch (Throwable t) {
-                    LOG.warn("Failure in NotificationHookConsumer", t);
+                case ENTITY_FULL_UPDATE:
+                    HookNotification.EntityUpdateRequest updateRequest =
+                            (HookNotification.EntityUpdateRequest) message;
+                    atlasClient.updateEntities(updateRequest.getEntities());
+                    break;
+
+                default:
+                    throw new IllegalStateException("Unhandled exception!");
                 }
+            } catch (Exception e) {
+                //todo handle failures
+                LOG.warn("Error handling message {}", message, e);
             }
+            consumer.commit();
         }
 
         boolean serverAvailable(Timer timer) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
new file mode 100644
index 0000000..5b2ffeb
--- /dev/null
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.inject.Inject;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.LocalAtlasClient;
+import org.apache.atlas.kafka.KafkaNotification;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.commons.lang.RandomStringUtils;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@Guice(modules = NotificationModule.class)
+public class NotificationHookConsumerKafkaTest {
+
+    @Inject
+    private NotificationInterface notificationInterface;
+
+    private KafkaNotification kafkaNotification;
+
+    @BeforeTest
+    public void setup() throws AtlasException {
+        kafkaNotification = startKafkaServer();
+    }
+
+    @AfterTest
+    public void shutdown() {
+        kafkaNotification.stop();
+    }
+
+    @Test
+    public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws 
AtlasException, InterruptedException {
+
+        produceMessage(new HookNotification.EntityCreateRequest("test_user1", 
createEntity()));
+
+        NotificationConsumer<HookNotification.HookNotificationMessage> 
consumer =
+                createNewConsumer(kafkaNotification, false);
+        LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class);
+        NotificationHookConsumer notificationHookConsumer =
+                new NotificationHookConsumer(kafkaNotification, 
localAtlasClient);
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(consumer);
+
+        consumeOneMessage(consumer, hookConsumer);
+        verify(localAtlasClient).setUser("test_user1");
+
+
+        // produce another message, and make sure it moves ahead. If commit 
succeeded, this would work.
+        produceMessage(new HookNotification.EntityCreateRequest("test_user2", 
createEntity()));
+        consumeOneMessage(consumer, hookConsumer);
+        verify(localAtlasClient).setUser("test_user2");
+
+        kafkaNotification.close();
+    }
+
+    @Test
+    public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled()
+            throws NotificationException, InterruptedException {
+
+        produceMessage(new HookNotification.EntityCreateRequest("test_user3", 
createEntity()));
+
+        NotificationConsumer<HookNotification.HookNotificationMessage> 
consumer =
+                createNewConsumer(kafkaNotification, true);
+        LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class);
+        NotificationHookConsumer notificationHookConsumer =
+                new NotificationHookConsumer(kafkaNotification, 
localAtlasClient);
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(consumer);
+
+        consumeOneMessage(consumer, hookConsumer);
+        verify(localAtlasClient).setUser("test_user3");
+
+        // produce another message, but this will not be consumed, as commit 
code is not executed in hook consumer.
+        produceMessage(new HookNotification.EntityCreateRequest("test_user4", 
createEntity()));
+
+        consumeOneMessage(consumer, hookConsumer);
+        verify(localAtlasClient).setUser("test_user3");
+
+        kafkaNotification.close();
+    }
+
+    NotificationConsumer<HookNotification.HookNotificationMessage> 
createNewConsumer(
+            KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
+        return 
kafkaNotification.<HookNotification.HookNotificationMessage>createConsumers(
+                NotificationInterface.NotificationType.HOOK, 1, 
autoCommitEnabled).get(0);
+    }
+
+    void 
consumeOneMessage(NotificationConsumer<HookNotification.HookNotificationMessage>
 consumer,
+                           NotificationHookConsumer.HookConsumer hookConsumer) 
throws InterruptedException {
+        while (!consumer.hasNext()) {
+            Thread.sleep(1000);
+        }
+        hookConsumer.handleMessage(consumer.next());
+    }
+
+    Referenceable createEntity() {
+        final Referenceable entity = new 
Referenceable(AtlasClient.DATA_SET_SUPER_TYPE);
+        entity.set("name", "db" + randomString());
+        entity.set("description", randomString());
+        return entity;
+    }
+
+    KafkaNotification startKafkaServer() throws AtlasException {
+        KafkaNotification kafkaNotification = (KafkaNotification) 
notificationInterface;
+        kafkaNotification.start();
+        return kafkaNotification;
+    }
+
+    protected String randomString() {
+        return RandomStringUtils.randomAlphanumeric(10);
+    }
+
+    private void produceMessage(HookNotification.HookNotificationMessage 
message) throws NotificationException {
+        
notificationInterface.send(NotificationInterface.NotificationType.HOOK, 
message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index 8765826..7860eb6 100644
--- 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -21,6 +21,7 @@ import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.LocalAtlasClient;
 import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.commons.configuration.Configuration;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
@@ -34,6 +35,7 @@ import java.util.concurrent.ExecutorService;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
@@ -87,6 +89,40 @@ public class NotificationHookConsumerTest {
     }
 
     @Test
+    public void testCommitIsCalledWhenMessageIsProcessed() throws 
AtlasServiceException {
+        NotificationHookConsumer notificationHookConsumer =
+                new NotificationHookConsumer(notificationInterface, 
atlasClient);
+        NotificationConsumer consumer = mock(NotificationConsumer.class);
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(consumer);
+        HookNotification.EntityCreateRequest message = 
mock(HookNotification.EntityCreateRequest.class);
+        when(message.getUser()).thenReturn("user");
+        
when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE);
+
+        hookConsumer.handleMessage(message);
+
+        verify(consumer).commit();
+    }
+
+    @Test
+    public void testCommitIsCalledEvenWhenMessageProcessingFails() throws 
AtlasServiceException {
+        NotificationHookConsumer notificationHookConsumer =
+                new NotificationHookConsumer(notificationInterface, 
atlasClient);
+        NotificationConsumer consumer = mock(NotificationConsumer.class);
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(consumer);
+        HookNotification.EntityCreateRequest message = 
mock(HookNotification.EntityCreateRequest.class);
+        when(message.getUser()).thenReturn("user");
+        
when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE);
+        when(atlasClient.createEntity(any(List.class))).
+                thenThrow(new RuntimeException("Simulating exception in 
processing message"));
+
+        hookConsumer.handleMessage(message);
+
+        verify(consumer).commit();
+    }
+
+    @Test
     public void testConsumerProceedsWithFalseIfInterrupted() throws 
AtlasServiceException, InterruptedException {
         NotificationHookConsumer notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasClient);
         NotificationHookConsumer.HookConsumer hookConsumer =


Reply via email to