Repository: incubator-atlas Updated Branches: refs/heads/master 7f2a40864 -> 30893c5e5
ATLAS-1111 Data loss is observed when atlas is restarted while hive_table metadata ingestion into kafka topic is in-progress(shwethags via sumasai) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/30893c5e Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/30893c5e Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/30893c5e Branch: refs/heads/master Commit: 30893c5e5be5a1c3a2f104d554cc3772a6ef7b81 Parents: 7f2a408 Author: Suma Shivaprasad <[email protected]> Authored: Fri Aug 12 14:03:55 2016 -0700 Committer: Suma Shivaprasad <[email protected]> Committed: Fri Aug 12 14:03:55 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/atlas/service/Services.java | 4 +- distro/src/conf/atlas-log4j.xml | 13 ++ .../org/apache/atlas/kafka/KafkaConsumer.java | 5 + .../notification/NotificationConsumer.java | 2 + .../notification/hook/HookNotification.java | 2 +- .../atlas/kafka/KafkaNotificationMockTest.java | 198 ++++++++++++++++ .../atlas/kafka/KafkaNotificationTest.java | 233 ++++++------------- .../AbstractNotificationConsumerTest.java | 5 + .../classloader/AtlasPluginClassLoader.java | 32 +-- release-log.txt | 1 + .../atlas/GraphTransactionInterceptor.java | 2 +- typesystem/src/main/resources/atlas-log4j.xml | 13 ++ .../test/resources/atlas-application.properties | 4 +- .../notification/NotificationHookConsumer.java | 113 ++++++--- .../atlas/web/listeners/GuiceServletConfig.java | 13 +- webapp/src/main/webapp/WEB-INF/web.xml | 8 +- .../NotificationHookConsumerKafkaTest.java | 19 +- .../NotificationHookConsumerTest.java | 31 ++- 18 files changed, 444 insertions(+), 254 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/common/src/main/java/org/apache/atlas/service/Services.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/service/Services.java b/common/src/main/java/org/apache/atlas/service/Services.java index 8b2e205..588dd8e 100644 --- a/common/src/main/java/org/apache/atlas/service/Services.java +++ b/common/src/main/java/org/apache/atlas/service/Services.java @@ -41,7 +41,7 @@ public class Services { public void start() { try { for (Service service : services) { - LOG.debug("Starting service {}", service.getClass().getName()); + LOG.info("Starting service {}", service.getClass().getName()); service.start(); } } catch (Exception e) { @@ -51,7 +51,7 @@ public class Services { public void stop() { for (Service service : services) { - LOG.debug("Stopping service {}", service.getClass().getName()); + LOG.info("Stopping service {}", service.getClass().getName()); try { service.stop(); } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/distro/src/conf/atlas-log4j.xml ---------------------------------------------------------------------- diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml index eaa4ec5..400cd3a 100755 --- a/distro/src/conf/atlas-log4j.xml +++ b/distro/src/conf/atlas-log4j.xml @@ -43,6 +43,14 @@ </layout> </appender> + <appender name="FAILED" class="org.apache.log4j.DailyRollingFileAppender"> + <param name="File" value="${atlas.log.dir}/failed.log"/> + <param name="Append" value="true"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %m"/> + </layout> + </appender> + <logger name="org.apache.atlas" additivity="false"> <level value="info"/> <appender-ref ref="FILE"/> @@ -80,6 +88,11 @@ <appender-ref ref="AUDIT"/> </logger> + <logger name="FAILED" additivity="false"> + <level value="info"/> + <appender-ref ref="AUDIT"/> + </logger> + <root> <priority value="warn"/> <appender-ref ref="FILE"/> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java index 270215d..16c0eb2 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java @@ -96,4 +96,9 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> { LOG.debug("Committed offset: {}", lastSeenOffset); } } + + @Override + public void close() { + consumerConnector.shutdown(); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java index 2e861cb..a99cb10 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java @@ -52,4 +52,6 @@ public interface NotificationConsumer<T> { * restart. */ void commit(); + + void close(); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java index 88a0322..a25aa52 100644 --- a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java @@ -156,7 +156,7 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN } } - public List<Referenceable> getEntities() throws JSONException { + public List<Referenceable> getEntities() { return entities; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java new file mode 100644 index 0000000..2126be6 --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.kafka; + +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.serializer.StringDecoder; +import org.apache.atlas.notification.MessageDeserializer; +import org.apache.atlas.notification.NotificationConsumer; +import org.apache.atlas.notification.NotificationException; +import org.apache.atlas.notification.NotificationInterface; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +public class KafkaNotificationMockTest { + + @Test + @SuppressWarnings("unchecked") + public void testCreateConsumers() throws Exception { + Properties properties = mock(Properties.class); + when(properties.getProperty("entities.group.id")).thenReturn("atlas"); + final ConsumerConnector consumerConnector = mock(ConsumerConnector.class); + Map<String, Integer> topicCountMap = new HashMap<>(); + topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 1); + + Map<String, List<KafkaStream<String, String>>> kafkaStreamsMap = + new HashMap<>(); + List<KafkaStream<String, String>> kafkaStreams = new ArrayList<>(); + KafkaStream kafkaStream = mock(KafkaStream.class); + kafkaStreams.add(kafkaStream); + kafkaStreamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreams); + + when(consumerConnector.createMessageStreams( + eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(kafkaStreamsMap); + + final KafkaConsumer consumer1 = mock(KafkaConsumer.class); + final KafkaConsumer consumer2 = mock(KafkaConsumer.class); + + KafkaNotification kafkaNotification = + new TestKafkaNotification(properties, consumerConnector, consumer1, consumer2); + + List<NotificationConsumer<String>> consumers = + kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2); + + verify(consumerConnector, times(2)).createMessageStreams( + eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class)); + assertEquals(consumers.size(), 2); + assertTrue(consumers.contains(consumer1)); + assertTrue(consumers.contains(consumer2)); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldSendMessagesSuccessfully() throws NotificationException, + ExecutionException, InterruptedException { + Properties configProperties = mock(Properties.class); + KafkaNotification kafkaNotification = new KafkaNotification(configProperties); + + Producer producer = mock(Producer.class); + String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK); + String message = "This is a test message"; + Future returnValue = mock(Future.class); + when(returnValue.get()).thenReturn(new RecordMetadata(new TopicPartition(topicName, 0), 0, 0)); + ProducerRecord expectedRecord = new ProducerRecord(topicName, message); + when(producer.send(expectedRecord)).thenReturn(returnValue); + + kafkaNotification.sendInternalToProducer(producer, + NotificationInterface.NotificationType.HOOK, new String[]{message}); + + verify(producer).send(expectedRecord); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldThrowExceptionIfProducerFails() throws NotificationException, + ExecutionException, InterruptedException { + Properties configProperties = mock(Properties.class); + KafkaNotification kafkaNotification = new KafkaNotification(configProperties); + + Producer producer = mock(Producer.class); + String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK); + String message = "This is a test message"; + Future returnValue = mock(Future.class); + when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception")); + ProducerRecord expectedRecord = new ProducerRecord(topicName, message); + when(producer.send(expectedRecord)).thenReturn(returnValue); + + try { + kafkaNotification.sendInternalToProducer(producer, + NotificationInterface.NotificationType.HOOK, new String[]{message}); + fail("Should have thrown NotificationException"); + } catch (NotificationException e) { + assertEquals(e.getFailedMessages().size(), 1); + assertEquals(e.getFailedMessages().get(0), "This is a test message"); + } + } + + @Test + @SuppressWarnings("unchecked") + public void shouldCollectAllFailedMessagesIfProducerFails() throws NotificationException, + ExecutionException, InterruptedException { + Properties configProperties = mock(Properties.class); + KafkaNotification kafkaNotification = new KafkaNotification(configProperties); + + Producer producer = mock(Producer.class); + String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK); + String message1 = "This is a test message1"; + String message2 = "This is a test message2"; + Future returnValue1 = mock(Future.class); + when(returnValue1.get()).thenThrow(new RuntimeException("Simulating exception")); + Future returnValue2 = mock(Future.class); + when(returnValue2.get()).thenThrow(new RuntimeException("Simulating exception")); + ProducerRecord expectedRecord1 = new ProducerRecord(topicName, message1); + when(producer.send(expectedRecord1)).thenReturn(returnValue1); + ProducerRecord expectedRecord2 = new ProducerRecord(topicName, message2); + when(producer.send(expectedRecord2)).thenReturn(returnValue1); + + try { + kafkaNotification.sendInternalToProducer(producer, + NotificationInterface.NotificationType.HOOK, new String[]{message1, message2}); + fail("Should have thrown NotificationException"); + } catch (NotificationException e) { + assertEquals(e.getFailedMessages().size(), 2); + assertEquals(e.getFailedMessages().get(0), "This is a test message1"); + assertEquals(e.getFailedMessages().get(1), "This is a test message2"); + } + } + + class TestKafkaNotification extends KafkaNotification { + + private final ConsumerConnector consumerConnector; + private final KafkaConsumer consumer1; + private final KafkaConsumer consumer2; + + TestKafkaNotification(Properties properties, ConsumerConnector consumerConnector, + KafkaConsumer consumer1, KafkaConsumer consumer2) { + super(properties); + this.consumerConnector = consumerConnector; + this.consumer1 = consumer1; + this.consumer2 = consumer2; + } + + @Override + protected ConsumerConnector createConsumerConnector(Properties consumerProperties) { + return consumerConnector; + } + + @Override + protected <T> org.apache.atlas.kafka.KafkaConsumer<T> + createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream, + int consumerId, ConsumerConnector connector, boolean autoCommitEnabled) { + if (consumerId == 0) { + return consumer1; + } else if (consumerId == 1) { + return consumer2; + } + return null; + } + + + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java index 2a49634..a810029 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java @@ -15,184 +15,93 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.atlas.kafka; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.serializer.StringDecoder; -import org.apache.atlas.notification.MessageDeserializer; +import org.apache.atlas.ApplicationProperties; import org.apache.atlas.notification.NotificationConsumer; -import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationInterface; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.TopicPartition; +import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.RandomStringUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; public class KafkaNotificationTest { - @Test - @SuppressWarnings("unchecked") - public void testCreateConsumers() throws Exception { - Properties properties = mock(Properties.class); - when(properties.getProperty("entities.group.id")).thenReturn("atlas"); - final ConsumerConnector consumerConnector = mock(ConsumerConnector.class); - Map<String, Integer> topicCountMap = new HashMap<>(); - topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 1); - - Map<String, List<KafkaStream<String, String>>> kafkaStreamsMap = - new HashMap<>(); - List<KafkaStream<String, String>> kafkaStreams = new ArrayList<>(); - KafkaStream kafkaStream = mock(KafkaStream.class); - kafkaStreams.add(kafkaStream); - kafkaStreamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreams); - - when(consumerConnector.createMessageStreams( - eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(kafkaStreamsMap); - - final KafkaConsumer consumer1 = mock(KafkaConsumer.class); - final KafkaConsumer consumer2 = mock(KafkaConsumer.class); - - KafkaNotification kafkaNotification = - new TestKafkaNotification(properties, consumerConnector, consumer1, consumer2); - - List<NotificationConsumer<String>> consumers = - kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2); - - verify(consumerConnector, times(2)).createMessageStreams( - eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class)); - assertEquals(consumers.size(), 2); - assertTrue(consumers.contains(consumer1)); - assertTrue(consumers.contains(consumer2)); - } + private KafkaNotification kafkaNotification; - @Test - @SuppressWarnings("unchecked") - public void shouldSendMessagesSuccessfully() throws NotificationException, - ExecutionException, InterruptedException { - Properties configProperties = mock(Properties.class); - KafkaNotification kafkaNotification = new KafkaNotification(configProperties); - - Producer producer = mock(Producer.class); - String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK); - String message = "This is a test message"; - Future returnValue = mock(Future.class); - when(returnValue.get()).thenReturn(new RecordMetadata(new TopicPartition(topicName, 0), 0, 0)); - ProducerRecord expectedRecord = new ProducerRecord(topicName, message); - when(producer.send(expectedRecord)).thenReturn(returnValue); - - kafkaNotification.sendInternalToProducer(producer, - NotificationInterface.NotificationType.HOOK, new String[]{message}); - - verify(producer).send(expectedRecord); - } + @BeforeClass + public void setup() throws Exception { + Configuration properties = ApplicationProperties.get(); + properties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5)); - @Test - @SuppressWarnings("unchecked") - public void shouldThrowExceptionIfProducerFails() throws NotificationException, - ExecutionException, InterruptedException { - Properties configProperties = mock(Properties.class); - KafkaNotification kafkaNotification = new KafkaNotification(configProperties); - - Producer producer = mock(Producer.class); - String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK); - String message = "This is a test message"; - Future returnValue = mock(Future.class); - when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception")); - ProducerRecord expectedRecord = new ProducerRecord(topicName, message); - when(producer.send(expectedRecord)).thenReturn(returnValue); - - try { - kafkaNotification.sendInternalToProducer(producer, - NotificationInterface.NotificationType.HOOK, new String[]{message}); - fail("Should have thrown NotificationException"); - } catch (NotificationException e) { - assertEquals(e.getFailedMessages().size(), 1); - assertEquals(e.getFailedMessages().get(0), "This is a test message"); - } + kafkaNotification = new KafkaNotification(properties); + kafkaNotification.start(); } - @Test - @SuppressWarnings("unchecked") - public void shouldCollectAllFailedMessagesIfProducerFails() throws NotificationException, - ExecutionException, InterruptedException { - Properties configProperties = mock(Properties.class); - KafkaNotification kafkaNotification = new KafkaNotification(configProperties); - - Producer producer = mock(Producer.class); - String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK); - String message1 = "This is a test message1"; - String message2 = "This is a test message2"; - Future returnValue1 = mock(Future.class); - when(returnValue1.get()).thenThrow(new RuntimeException("Simulating exception")); - Future returnValue2 = mock(Future.class); - when(returnValue2.get()).thenThrow(new RuntimeException("Simulating exception")); - ProducerRecord expectedRecord1 = new ProducerRecord(topicName, message1); - when(producer.send(expectedRecord1)).thenReturn(returnValue1); - ProducerRecord expectedRecord2 = new ProducerRecord(topicName, message2); - when(producer.send(expectedRecord2)).thenReturn(returnValue1); - - try { - kafkaNotification.sendInternalToProducer(producer, - NotificationInterface.NotificationType.HOOK, new String[]{message1, message2}); - fail("Should have thrown NotificationException"); - } catch (NotificationException e) { - assertEquals(e.getFailedMessages().size(), 2); - assertEquals(e.getFailedMessages().get(0), "This is a test message1"); - assertEquals(e.getFailedMessages().get(1), "This is a test message2"); - } + @AfterClass + public void shutdown() throws Exception { + kafkaNotification.close(); + kafkaNotification.stop(); } - class TestKafkaNotification extends KafkaNotification { - - private final ConsumerConnector consumerConnector; - private final KafkaConsumer consumer1; - private final KafkaConsumer consumer2; - - TestKafkaNotification(Properties properties, ConsumerConnector consumerConnector, - KafkaConsumer consumer1, KafkaConsumer consumer2) { - super(properties); - this.consumerConnector = consumerConnector; - this.consumer1 = consumer1; - this.consumer2 = consumer2; - } - - @Override - protected ConsumerConnector createConsumerConnector(Properties consumerProperties) { - return consumerConnector; - } - - @Override - protected <T> org.apache.atlas.kafka.KafkaConsumer<T> - createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream stream, - int consumerId, ConsumerConnector connector, boolean autoCommitEnabled) { - if (consumerId == 0) { - return consumer1; - } else if (consumerId == 1) { - return consumer2; - } - return null; - } - - + @Test + public void testNext() throws Exception { + kafkaNotification.send(NotificationInterface.NotificationType.HOOK, + new HookNotification.EntityCreateRequest("u1", new Referenceable("type"))); + kafkaNotification.send(NotificationInterface.NotificationType.HOOK, + new HookNotification.EntityCreateRequest("u2", new Referenceable("type"))); + kafkaNotification.send(NotificationInterface.NotificationType.HOOK, + new HookNotification.EntityCreateRequest("u3", new Referenceable("type"))); + kafkaNotification.send(NotificationInterface.NotificationType.HOOK, + new HookNotification.EntityCreateRequest("u4", new Referenceable("type"))); + + NotificationConsumer<Object> consumer = + kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0); + assertTrue(consumer.hasNext()); + HookNotification.HookNotificationMessage message = (HookNotification.HookNotificationMessage) consumer.next(); + assertEquals(message.getUser(), "u1"); + + assertTrue(consumer.hasNext()); + message = (HookNotification.HookNotificationMessage) consumer.next(); + assertEquals(message.getUser(), "u2"); + consumer.close(); + + //nothing committed(even though u1 and u2 are read), now should restart from u1 + consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0); + assertTrue(consumer.hasNext()); + message = (HookNotification.HookNotificationMessage) consumer.next(); + assertEquals(message.getUser(), "u1"); + consumer.commit(); + + assertTrue(consumer.hasNext()); + message = (HookNotification.HookNotificationMessage) consumer.next(); + assertEquals(message.getUser(), "u2"); + consumer.close(); + + //u1 committed, u2 read, should start from u2 + consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0); + assertTrue(consumer.hasNext()); + message = (HookNotification.HookNotificationMessage) consumer.next(); + assertEquals(message.getUser(), "u2"); + + assertTrue(consumer.hasNext()); + message = (HookNotification.HookNotificationMessage) consumer.next(); + assertEquals(message.getUser(), "u3"); + consumer.commit(); + consumer.close(); + + //u2, u3 read, but only u3 committed, should start from u4 + consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0); + assertTrue(consumer.hasNext()); + message = (HookNotification.HookNotificationMessage) consumer.next(); + assertEquals(message.getUser(), "u4"); + consumer.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java index 0c8990f..ed5b9fc 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java @@ -262,6 +262,11 @@ public class AbstractNotificationConsumerTest { public void commit() { // do nothing. } + + @Override + public void close() { + //do nothing + } } private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/plugin-classloader/src/main/java/org/apache/atlas/plugin/classloader/AtlasPluginClassLoader.java ---------------------------------------------------------------------- diff --git a/plugin-classloader/src/main/java/org/apache/atlas/plugin/classloader/AtlasPluginClassLoader.java b/plugin-classloader/src/main/java/org/apache/atlas/plugin/classloader/AtlasPluginClassLoader.java index 92cc406..0dbf352 100644 --- a/plugin-classloader/src/main/java/org/apache/atlas/plugin/classloader/AtlasPluginClassLoader.java +++ b/plugin-classloader/src/main/java/org/apache/atlas/plugin/classloader/AtlasPluginClassLoader.java @@ -75,16 +75,16 @@ public final class AtlasPluginClassLoader extends URLClassLoader { @Override public Class<?> findClass(String name) throws ClassNotFoundException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasPluginClassLoader.findClass(" + name + ")"); + if (LOG.isTraceEnabled()) { + LOG.trace("==> AtlasPluginClassLoader.findClass(" + name + ")"); } Class<?> ret = null; try { // first try to find the class in pluginClassloader - if (LOG.isDebugEnabled()) { - LOG.debug("AtlasPluginClassLoader.findClass(" + name + "): calling pluginClassLoader.findClass()"); + if (LOG.isTraceEnabled()) { + LOG.trace("AtlasPluginClassLoader.findClass(" + name + "): calling pluginClassLoader.findClass()"); } ret = super.findClass(name); @@ -93,8 +93,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader { MyClassLoader savedClassLoader = getComponentClassLoader(); if (savedClassLoader != null) { - if (LOG.isDebugEnabled()) { - LOG.debug( + if (LOG.isTraceEnabled()) { + LOG.trace( "AtlasPluginClassLoader.findClass(" + name + "): calling componentClassLoader.findClass()"); } @@ -102,8 +102,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader { } } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasPluginClassLoader.findClass(" + name + "): " + ret); + if (LOG.isTraceEnabled()) { + LOG.trace("<== AtlasPluginClassLoader.findClass(" + name + "): " + ret); } return ret; @@ -111,16 +111,16 @@ public final class AtlasPluginClassLoader extends URLClassLoader { @Override public Class<?> loadClass(String name) throws ClassNotFoundException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasPluginClassLoader.loadClass(" + name + ")"); + if (LOG.isTraceEnabled()) { + LOG.trace("==> AtlasPluginClassLoader.loadClass(" + name + ")"); } Class<?> ret = null; try { // first try to load the class from pluginClassloader - if (LOG.isDebugEnabled()) { - LOG.debug("AtlasPluginClassLoader.loadClass(" + name + "): calling pluginClassLoader.loadClass()"); + if (LOG.isTraceEnabled()) { + LOG.trace("AtlasPluginClassLoader.loadClass(" + name + "): calling pluginClassLoader.loadClass()"); } ret = super.loadClass(name); @@ -129,8 +129,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader { MyClassLoader savedClassLoader = getComponentClassLoader(); if (savedClassLoader != null) { - if (LOG.isDebugEnabled()) { - LOG.debug( + if (LOG.isTraceEnabled()) { + LOG.trace( "AtlasPluginClassLoader.loadClass(" + name + "): calling componentClassLoader.loadClass()"); } @@ -138,8 +138,8 @@ public final class AtlasPluginClassLoader extends URLClassLoader { } } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasPluginClassLoader.loadClass(" + name + "): " + ret); + if (LOG.isTraceEnabled()) { + LOG.trace("<== AtlasPluginClassLoader.loadClass(" + name + "): " + ret); } return ret; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 10667ad..53eeb14 100644 --- a/release-log.txt +++ b/release-log.txt @@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES: ATLAS-1060 Add composite indexes for exact match performance improvements for all attributes (sumasai via shwethags) ALL CHANGES: +ATLAS-1111 Data loss is observed when atlas is restarted while hive_table metadata ingestion into kafka topic is in-progress(shwethags via sumasai) ATLAS-1115 Show Tag / Taxonomy Listing in sorted order (Kalyanikashikar via sumasai) ATLAS-1117 Atlas start fails on trunk (jnhagelb via dkantor) ATLAS-1112 Hive table GET response from atlas server had duplicate column entries ( ayubkhan, mneethiraj via sumasai) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java index b9689f4..20e8ebc 100644 --- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java +++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java @@ -41,7 +41,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor { try { Object response = invocation.proceed(); titanGraph.commit(); - LOG.debug("graph commit"); + LOG.info("graph commit"); return response; } catch (Throwable t) { titanGraph.rollback(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/typesystem/src/main/resources/atlas-log4j.xml ---------------------------------------------------------------------- diff --git a/typesystem/src/main/resources/atlas-log4j.xml b/typesystem/src/main/resources/atlas-log4j.xml index 0f7573e..5a48854 100755 --- a/typesystem/src/main/resources/atlas-log4j.xml +++ b/typesystem/src/main/resources/atlas-log4j.xml @@ -57,6 +57,19 @@ </logger> --> + <appender name="FAILED" class="org.apache.log4j.DailyRollingFileAppender"> + <param name="File" value="${atlas.log.dir}/failed.log"/> + <param name="Append" value="true"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %m"/> + </layout> + </appender> + + <logger name="FAILED" additivity="false"> + <level value="info"/> + <appender-ref ref="AUDIT"/> + </logger> + <logger name="com.thinkaurelius.titan" additivity="false"> <level value="info"/> <appender-ref ref="console"/> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/typesystem/src/test/resources/atlas-application.properties ---------------------------------------------------------------------- diff --git a/typesystem/src/test/resources/atlas-application.properties b/typesystem/src/test/resources/atlas-application.properties index a3b6c90..fb31462 100644 --- a/typesystem/src/test/resources/atlas-application.properties +++ b/typesystem/src/test/resources/atlas-application.properties @@ -77,7 +77,7 @@ atlas.kafka.bootstrap.servers=localhost:19027 atlas.kafka.data=${sys:atlas.data}/kafka atlas.kafka.zookeeper.session.timeout.ms=4000 atlas.kafka.zookeeper.sync.time.ms=20 -atlas.kafka.consumer.timeout.ms=100 +atlas.kafka.consumer.timeout.ms=4000 atlas.kafka.auto.commit.interval.ms=100 atlas.kafka.hook.group.id=atlas atlas.kafka.entities.group.id=atlas_entities @@ -122,4 +122,4 @@ atlas.auth.policy.file=${sys:user.dir}/distro/src/conf/policy-store.txt atlas.authentication.method.file=true atlas.authentication.method.ldap.type=none atlas.authentication.method.file.filename=${sys:user.dir}/distro/src/conf/users-credentials.properties -atlas.authentication.method.kerberos=false \ No newline at end of file +atlas.authentication.method.kerberos=false http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 901b1ed..6b1f3f2 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -24,6 +24,7 @@ import com.google.inject.Singleton; import kafka.consumer.ConsumerTimeoutException; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; +import org.apache.atlas.AtlasServiceException; import org.apache.atlas.LocalAtlasClient; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; @@ -46,11 +47,18 @@ import java.util.concurrent.atomic.AtomicBoolean; @Singleton public class NotificationHookConsumer implements Service, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class); + private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED"); + private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName(); public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; + public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries"; + public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize"; + public static final int SERVER_READY_WAIT_TIME_MS = 1000; private final LocalAtlasClient atlasClient; + private final int maxRetries; + private final int failedMsgCacheSize; private NotificationInterface notificationInterface; private ExecutorService executors; @@ -58,20 +66,23 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private List<HookConsumer> consumers; @Inject - public NotificationHookConsumer(NotificationInterface notificationInterface, LocalAtlasClient atlasClient) { + public NotificationHookConsumer(NotificationInterface notificationInterface, LocalAtlasClient atlasClient) + throws AtlasException { this.notificationInterface = notificationInterface; this.atlasClient = atlasClient; + this.applicationProperties = ApplicationProperties.get(); + + maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); + failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20); + } @Override public void start() throws AtlasException { - Configuration configuration = ApplicationProperties.get(); - startInternal(configuration, null); + startInternal(applicationProperties, null); } - void startInternal(Configuration configuration, - ExecutorService executorService) { - this.applicationProperties = configuration; + void startInternal(Configuration configuration, ExecutorService executorService) { if (consumers == null) { consumers = new ArrayList<>(); } @@ -103,16 +114,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @Override public void stop() { //Allow for completion of outstanding work - notificationInterface.close(); try { + stopConsumerThreads(); if (executors != null) { - stopConsumerThreads(); - executors.shutdownNow(); + executors.shutdown(); if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) { LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } executors = null; } + notificationInterface.close(); } catch (InterruptedException e) { LOG.error("Failure in shutting down consumers"); } @@ -160,6 +171,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl class HookConsumer implements Runnable { private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer; private final AtomicBoolean shouldRun = new AtomicBoolean(false); + private List<HookNotification.HookNotificationMessage> failedMessages = new ArrayList<>(); public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) { this.consumer = consumer; @@ -193,45 +205,71 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } @VisibleForTesting - void handleMessage(HookNotification.HookNotificationMessage message) { - atlasClient.setUser(message.getUser()); - try { - switch (message.getType()) { - case ENTITY_CREATE: - HookNotification.EntityCreateRequest createRequest = + void handleMessage(HookNotification.HookNotificationMessage message) throws + AtlasServiceException, AtlasException { + for (int numRetries = 0; numRetries < maxRetries; numRetries++) { + LOG.debug("Running attempt {}", numRetries); + try { + atlasClient.setUser(message.getUser()); + switch (message.getType()) { + case ENTITY_CREATE: + HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) message; - atlasClient.createEntity(createRequest.getEntities()); - break; + atlasClient.createEntity(createRequest.getEntities()); + break; - case ENTITY_PARTIAL_UPDATE: - HookNotification.EntityPartialUpdateRequest partialUpdateRequest = + case ENTITY_PARTIAL_UPDATE: + HookNotification.EntityPartialUpdateRequest partialUpdateRequest = (HookNotification.EntityPartialUpdateRequest) message; - atlasClient.updateEntity(partialUpdateRequest.getTypeName(), + atlasClient.updateEntity(partialUpdateRequest.getTypeName(), partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity()); - break; + break; - case ENTITY_DELETE: - HookNotification.EntityDeleteRequest deleteRequest = - (HookNotification.EntityDeleteRequest) message; - atlasClient.deleteEntity(deleteRequest.getTypeName(), - deleteRequest.getAttribute(), - deleteRequest.getAttributeValue()); - break; + case ENTITY_DELETE: + HookNotification.EntityDeleteRequest deleteRequest = + (HookNotification.EntityDeleteRequest) message; + atlasClient.deleteEntity(deleteRequest.getTypeName(), + deleteRequest.getAttribute(), + deleteRequest.getAttributeValue()); + break; - case ENTITY_FULL_UPDATE: - HookNotification.EntityUpdateRequest updateRequest = + case ENTITY_FULL_UPDATE: + HookNotification.EntityUpdateRequest updateRequest = (HookNotification.EntityUpdateRequest) message; - atlasClient.updateEntities(updateRequest.getEntities()); - break; + atlasClient.updateEntities(updateRequest.getEntities()); + break; - default: - throw new IllegalStateException("Unhandled exception!"); + default: + throw new IllegalStateException("Unhandled exception!"); + } + + break; + } catch (Throwable e) { + LOG.warn("Error handling message", e); + if (numRetries == (maxRetries - 1)) { + LOG.warn("Max retries exceeded for message {}", message, e); + failedMessages.add(message); + if (failedMessages.size() >= failedMsgCacheSize) { + recordFailedMessages(); + } + return; + } } - } catch (Exception e) { - //todo handle failures - LOG.warn("Error handling message {}", message, e); } + commit(); + } + + private void recordFailedMessages() { + //logging failed messages + for (HookNotification.HookNotificationMessage message : failedMessages) { + FAILED_LOG.error("[DROPPED_NOTIFICATION] " + AbstractNotification.getMessageJson(message)); + } + failedMessages.clear(); + } + + private void commit() { + recordFailedMessages(); consumer.commit(); } @@ -260,6 +298,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public void stop() { shouldRun.set(false); + consumer.close(); } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java index 0a7c5df..a1d3187 100755 --- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java +++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java @@ -155,8 +155,11 @@ public class GuiceServletConfig extends GuiceServletContextListener { @Override public void contextDestroyed(ServletContextEvent servletContextEvent) { - super.contextDestroyed(servletContextEvent); + LOG.info("Starting servlet context destroy"); if(injector != null) { + //stop services + stopServices(); + TypeLiteral<GraphProvider<TitanGraph>> graphProviderType = new TypeLiteral<GraphProvider<TitanGraph>>() {}; Provider<GraphProvider<TitanGraph>> graphProvider = injector.getProvider(Key.get(graphProviderType)); final Graph graph = graphProvider.get().get(); @@ -166,15 +169,13 @@ public class GuiceServletConfig extends GuiceServletContextListener { } catch(Throwable t) { LOG.warn("Error while shutting down graph", t); } - - //stop services - stopServices(); } + super.contextDestroyed(servletContextEvent); } protected void stopServices() { - LOG.debug("Stopping services"); + LOG.info("Stopping services"); Services services = injector.getInstance(Services.class); services.stop(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/webapp/src/main/webapp/WEB-INF/web.xml ---------------------------------------------------------------------- diff --git a/webapp/src/main/webapp/WEB-INF/web.xml b/webapp/src/main/webapp/WEB-INF/web.xml index 34b6856..2e36b94 100755 --- a/webapp/src/main/webapp/WEB-INF/web.xml +++ b/webapp/src/main/webapp/WEB-INF/web.xml @@ -63,11 +63,11 @@ </filter-mapping> <listener> - <listener-class>org.apache.atlas.web.listeners.GuiceServletConfig</listener-class> + <listener-class>org.springframework.web.util.Log4jConfigListener</listener-class> </listener> - + <listener> - <listener-class>org.springframework.web.util.Log4jConfigListener</listener-class> + <listener-class>org.apache.atlas.web.listeners.GuiceServletConfig</listener-class> </listener> <listener> @@ -80,4 +80,4 @@ -</web-app> \ No newline at end of file +</web-app> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index 6fd1939..683a028 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -21,11 +21,13 @@ package org.apache.atlas.notification; import com.google.inject.Inject; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; +import org.apache.atlas.AtlasServiceException; import org.apache.atlas.LocalAtlasClient; import org.apache.atlas.kafka.KafkaNotification; import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; import org.apache.commons.lang.RandomStringUtils; +import org.testng.Assert; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import org.testng.annotations.Guice; @@ -54,7 +56,6 @@ public class NotificationHookConsumerKafkaTest { @Test public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException { - produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); NotificationConsumer<HookNotification.HookNotificationMessage> consumer = @@ -68,7 +69,6 @@ public class NotificationHookConsumerKafkaTest { consumeOneMessage(consumer, hookConsumer); verify(localAtlasClient).setUser("test_user1"); - // produce another message, and make sure it moves ahead. If commit succeeded, this would work. produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity())); consumeOneMessage(consumer, hookConsumer); @@ -77,10 +77,8 @@ public class NotificationHookConsumerKafkaTest { kafkaNotification.close(); } - @Test - public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() - throws NotificationException, InterruptedException { - + @Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled") + public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception { produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity())); NotificationConsumer<HookNotification.HookNotificationMessage> consumer = @@ -114,7 +112,14 @@ public class NotificationHookConsumerKafkaTest { while (!consumer.hasNext()) { Thread.sleep(1000); } - hookConsumer.handleMessage(consumer.next()); + + try { + hookConsumer.handleMessage(consumer.next()); + } catch (AtlasServiceException e) { + Assert.fail("Consumer failed with exception ", e); + } catch (AtlasException e) { + Assert.fail("Consumer failed with exception ", e); + } } Referenceable createEntity() { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30893c5e/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java index d22c5f1..f06f791 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -18,10 +18,12 @@ package org.apache.atlas.notification; import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.LocalAtlasClient; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.typesystem.Referenceable; import org.apache.commons.configuration.Configuration; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -62,7 +64,7 @@ public class NotificationHookConsumerTest { } @Test - public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException { + public void testConsumerCanProceedIfServerIsReady() throws Exception { NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); @@ -75,7 +77,7 @@ public class NotificationHookConsumerTest { } @Test - public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException { + public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception { NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); @@ -88,7 +90,7 @@ public class NotificationHookConsumerTest { } @Test - public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException { + public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException { NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationConsumer consumer = mock(NotificationConsumer.class); @@ -104,25 +106,22 @@ public class NotificationHookConsumerTest { } @Test - public void testCommitIsCalledEvenWhenMessageProcessingFails() throws AtlasServiceException { + public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException { NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); - HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class); - when(message.getUser()).thenReturn("user"); - when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE); + HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user", new ArrayList<Referenceable>()); when(atlasClient.createEntity(any(List.class))). thenThrow(new RuntimeException("Simulating exception in processing message")); - hookConsumer.handleMessage(message); - verify(consumer).commit(); + verifyZeroInteractions(consumer); } @Test - public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException { + public void testConsumerProceedsWithFalseIfInterrupted() throws Exception { NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); @@ -134,7 +133,7 @@ public class NotificationHookConsumerTest { } @Test - public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException { + public void testConsumerProceedsWithFalseOnAtlasServiceException() throws Exception { NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); @@ -146,7 +145,7 @@ public class NotificationHookConsumerTest { } @Test - public void testConsumersStartedIfHAIsDisabled() { + public void testConsumersStartedIfHAIsDisabled() throws Exception { when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); List<NotificationConsumer<Object>> consumers = new ArrayList(); @@ -160,7 +159,7 @@ public class NotificationHookConsumerTest { } @Test - public void testConsumersAreNotStartedIfHAIsEnabled() { + public void testConsumersAreNotStartedIfHAIsEnabled() throws Exception { when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); @@ -174,7 +173,7 @@ public class NotificationHookConsumerTest { } @Test - public void testConsumersAreStartedWhenInstanceBecomesActive() { + public void testConsumersAreStartedWhenInstanceBecomesActive() throws Exception { when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); @@ -190,7 +189,7 @@ public class NotificationHookConsumerTest { } @Test - public void testConsumersAreStoppedWhenInstanceBecomesPassive() { + public void testConsumersAreStoppedWhenInstanceBecomesPassive() throws Exception { when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); List<NotificationConsumer<Object>> consumers = new ArrayList(); @@ -201,6 +200,6 @@ public class NotificationHookConsumerTest { notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsPassive(); verify(notificationInterface).close(); - verify(executorService).shutdownNow(); + verify(executorService).shutdown(); } }
