Repository: incubator-atlas Updated Branches: refs/heads/master 86dd72aff -> a2e7738aa
ATLAS-901 Log messages that cannot be sent to Kafka to a specific log configuration (yhemanth) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/a2e7738a Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/a2e7738a Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/a2e7738a Branch: refs/heads/master Commit: a2e7738aa25bba20bafa5f42ee1d628807a26b52 Parents: 86dd72a Author: Hemanth Yamijala <[email protected]> Authored: Fri Jun 17 14:58:13 2016 +0530 Committer: Hemanth Yamijala <[email protected]> Committed: Fri Jun 17 14:58:33 2016 +0530 ---------------------------------------------------------------------- docs/src/site/twiki/Configuration.twiki | 10 ++- .../java/org/apache/atlas/hook/AtlasHook.java | 36 +++++++- .../apache/atlas/hook/FailedMessagesLogger.java | 95 ++++++++++++++++++++ .../apache/atlas/kafka/KafkaNotification.java | 46 ++++++++-- .../notification/NotificationException.java | 13 +++ .../org/apache/atlas/hook/AtlasHookTest.java | 91 +++++++++++++++++-- .../atlas/kafka/KafkaNotificationTest.java | 85 ++++++++++++++++++ release-log.txt | 1 + 8 files changed, 362 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/docs/src/site/twiki/Configuration.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki index 0e122fe..3ad0fbe 100644 --- a/docs/src/site/twiki/Configuration.twiki +++ b/docs/src/site/twiki/Configuration.twiki @@ -168,9 +168,17 @@ atlas.notification.replicas=1 atlas.notification.kafka.service.principal=kafka/[email protected] # Set this to the location of the keytab file for Kafka #atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab - </verbatim> +These configuration parameters are useful for saving messages in case there are issues in reaching Kafka for +sending messages. + +<verbatim> +# Whether to save messages that failed to be sent to Kafka, default is true +atlas.notification.log.failed.messages=true +# If saving messages is enabled, the file name to save them to. This file will be created under the log directory of the hook's host component - like HiveServer2 +atlas.notification.failed.messages.filename=atlas_hook_failed_messages.log +</verbatim> ---++ Client Configs <verbatim> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java index 71029b0..2ca8d85 100644 --- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -18,9 +18,11 @@ package org.apache.atlas.hook; +import com.google.common.annotations.VisibleForTesting; import com.google.inject.Guice; import com.google.inject.Injector; import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.NotificationModule; import org.apache.atlas.notification.hook.HookNotification; @@ -50,6 +52,15 @@ public abstract class AtlasHook { protected static NotificationInterface notifInterface; + private static boolean logFailedMessages; + private static FailedMessagesLogger failedMessagesLogger; + + public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY = + "atlas.notification.failed.messages.filename"; + public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME = "atlas_hook_failed_messages.log"; + public static final String ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY = + "atlas.notification.log.failed.messages"; + static { try { atlasProperties = ApplicationProperties.get(); @@ -57,6 +68,14 @@ public abstract class AtlasHook { LOG.info("Failed to load application properties", e); } + String failedMessageFile = atlasProperties.getString(ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY, + ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME); + logFailedMessages = atlasProperties.getBoolean(ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY, true); + if (logFailedMessages) { + failedMessagesLogger = new FailedMessagesLogger(failedMessageFile); + failedMessagesLogger.init(); + } + Injector injector = Guice.createInjector(new NotificationModule()); notifInterface = injector.getInstance(NotificationInterface.class); @@ -89,18 +108,31 @@ public abstract class AtlasHook { * @param maxRetries maximum number of retries while sending message to messaging system */ public static void notifyEntities(List<HookNotification.HookNotificationMessage> messages, int maxRetries) { + notifyEntitiesInternal(messages, maxRetries, notifInterface, logFailedMessages, failedMessagesLogger); + } + + @VisibleForTesting + static void notifyEntitiesInternal(List<HookNotification.HookNotificationMessage> messages, int maxRetries, + NotificationInterface notificationInterface, + boolean shouldLogFailedMessages, FailedMessagesLogger logger) { final String message = messages.toString(); int numRetries = 0; while (true) { try { - notifInterface.send(NotificationInterface.NotificationType.HOOK, messages); + notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages); return; - } catch(Exception e) { + } catch (Exception e) { numRetries++; if (numRetries < maxRetries) { LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e); } else { + if (shouldLogFailedMessages && e instanceof NotificationException) { + List<String> failedMessages = ((NotificationException) e).getFailedMessages(); + for (String msg : failedMessages) { + logger.log(msg); + } + } LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", message, maxRetries, e); return; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java b/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java new file mode 100644 index 0000000..0b552d3 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hook; + + +import org.apache.log4j.Appender; +import org.apache.log4j.DailyRollingFileAppender; +import org.apache.log4j.FileAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; + +import java.io.File; +import java.io.IOException; +import java.util.Enumeration; + +/** + * A logger wrapper that can be used to write messages that failed to be sent to a log file. + */ +public class FailedMessagesLogger { + + public static final String PATTERN_SPEC_TIMESTAMP_MESSAGE_NEWLINE = "%d{ISO8601} %m%n"; + public static final String DATE_PATTERN = ".yyyy-MM-dd"; + + private final Logger logger = Logger.getLogger("org.apache.atlas.hook.FailedMessagesLogger"); + private String failedMessageFile; + + public FailedMessagesLogger(String failedMessageFile) { + this.failedMessageFile = failedMessageFile; + } + + void init() { + String rootLoggerDirectory = getRootLoggerDirectory(); + if (rootLoggerDirectory == null) { + return; + } + String absolutePath = new File(rootLoggerDirectory, failedMessageFile).getAbsolutePath(); + try { + DailyRollingFileAppender failedLogFilesAppender = new DailyRollingFileAppender( + new PatternLayout(PATTERN_SPEC_TIMESTAMP_MESSAGE_NEWLINE), absolutePath, DATE_PATTERN); + logger.addAppender(failedLogFilesAppender); + logger.setLevel(Level.ERROR); + logger.setAdditivity(false); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Get the root logger file location under which the failed log messages will be written. + * + * Since this class is used in Hooks which run within JVMs of other components like Hive, + * we want to write the failed messages file under the same location as where logs from + * the host component are saved. This method attempts to get such a location from the + * root logger's appenders. It will work only if at least one of the appenders is a {@link FileAppender} + * + * @return directory under which host component's logs are stored. + */ + private String getRootLoggerDirectory() { + String rootLoggerDirectory = null; + org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger(); + + Enumeration allAppenders = rootLogger.getAllAppenders(); + while (allAppenders.hasMoreElements()) { + Appender appender = (Appender) allAppenders.nextElement(); + if (appender instanceof FileAppender) { + FileAppender fileAppender = (FileAppender) appender; + String rootLoggerFile = fileAppender.getFile(); + rootLoggerDirectory = new File(rootLoggerFile).getParent(); + break; + } + } + return rootLoggerDirectory; + } + + void log(String message) { + logger.error(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 1ee62d2..806f2b4 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -37,6 +37,7 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationConverter; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -90,6 +91,10 @@ public class KafkaNotification extends AbstractNotification implements Service { } }; + @VisibleForTesting + String getTopicName(NotificationType notificationType) { + return TOPIC_MAP.get(notificationType); + } // ----- Constructors ---------------------------------------------------- @@ -214,24 +219,36 @@ public class KafkaNotification extends AbstractNotification implements Service { if (producer == null) { createProducer(); } + sendInternalToProducer(producer, type, messages); + } + @VisibleForTesting + void sendInternalToProducer(Producer p, NotificationType type, String[] messages) throws NotificationException { String topic = TOPIC_MAP.get(type); - List<Future<RecordMetadata>> futures = new ArrayList<>(); + List<MessageContext> messageContexts = new ArrayList<>(); for (String message : messages) { ProducerRecord record = new ProducerRecord(topic, message); LOG.debug("Sending message for topic {}: {}", topic, message); - futures.add(producer.send(record)); + Future future = p.send(record); + messageContexts.add(new MessageContext(future, message)); } - for (Future<RecordMetadata> future : futures) { + List<String> failedMessages = new ArrayList<>(); + Exception lastFailureException = null; + for (MessageContext context : messageContexts) { try { - RecordMetadata response = future.get(); + RecordMetadata response = context.getFuture().get(); LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(), response.partition(), response.offset()); } catch (Exception e) { - throw new NotificationException(e); + LOG.warn("Could not send message - {}", context.getMessage(), e); + lastFailureException = e; + failedMessages.add(context.getMessage()); } } + if (lastFailureException != null) { + throw new NotificationException(lastFailureException, failedMessages); + } } // ----- helper methods -------------------------------------------------- @@ -359,4 +376,23 @@ public class KafkaNotification extends AbstractNotification implements Service { } } } + + private class MessageContext { + + private final Future<RecordMetadata> future; + private final String message; + + public MessageContext(Future<RecordMetadata> future, String message) { + this.future = future; + this.message = message; + } + + public Future<RecordMetadata> getFuture() { + return future; + } + + public String getMessage() { + return message; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java index d9d89df..2dd9c9f 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java @@ -19,11 +19,24 @@ package org.apache.atlas.notification; import org.apache.atlas.AtlasException; +import java.util.List; + /** * Exception from notification. */ public class NotificationException extends AtlasException { + private List<String> failedMessages; + public NotificationException(Exception e) { super(e); } + + public NotificationException(Exception e, List<String> failedMessages) { + super(e); + this.failedMessages = failedMessages; + } + + public List<String> getFailedMessages() { + return failedMessages; + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java index 16cb0f0..9854bcc 100644 --- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java +++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java @@ -21,24 +21,101 @@ package org.apache.atlas.hook; import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.hook.HookNotification; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; public class AtlasHookTest { + + @Mock + private NotificationInterface notificationInterface; + + @Mock + private FailedMessagesLogger failedMessagesLogger; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test (timeOut = 10000) + public void testNotifyEntitiesDoesNotHangOnException() throws Exception { + List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); + doThrow(new NotificationException(new Exception())).when(notificationInterface) + .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); + AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 0, notificationInterface, false, + failedMessagesLogger); + // if we've reached here, the method finished OK. + } + + @Test + public void testNotifyEntitiesRetriesOnException() throws NotificationException { + List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); + doThrow(new NotificationException(new Exception())).when(notificationInterface) + .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); + AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, false, + failedMessagesLogger); + + verify(notificationInterface, times(2)). + send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); + } + + @Test + public void testFailedMessageIsLoggedIfRequired() throws NotificationException { + List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); + doThrow(new NotificationException(new Exception(), Arrays.asList("test message"))) + .when(notificationInterface) + .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); + AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true, + failedMessagesLogger); + + verify(failedMessagesLogger, times(1)).log("test message"); + } + @Test - public void testnotifyEntities() throws Exception{ + public void testFailedMessageIsNotLoggedIfNotRequired() throws NotificationException { List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); - NotificationInterface notifInterface = mock(NotificationInterface.class); - doThrow(new NotificationException(new Exception())).when(notifInterface) + doThrow(new NotificationException(new Exception(), Arrays.asList("test message"))) + .when(notificationInterface) .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifInterface = notifInterface; - AtlasHook.notifyEntities(hookNotificationMessages, 2); - System.out.println("AtlasHook.notifyEntities() returns successfully"); + AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, false, + failedMessagesLogger); + + verifyZeroInteractions(failedMessagesLogger); + } + + @Test + public void testAllFailedMessagesAreLogged() throws NotificationException { + List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); + doThrow(new NotificationException(new Exception(), Arrays.asList("test message1", "test message2"))) + .when(notificationInterface) + .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); + AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true, + failedMessagesLogger); + + verify(failedMessagesLogger, times(1)).log("test message1"); + verify(failedMessagesLogger, times(1)).log("test message2"); + } + + @Test + public void testFailedMessageIsNotLoggedIfNotANotificationException() throws Exception { + List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); + doThrow(new RuntimeException("test message")).when(notificationInterface) + .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); + AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true, + failedMessagesLogger); + + verifyZeroInteractions(failedMessagesLogger); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java index 219bd70..2a49634 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java @@ -22,7 +22,12 @@ import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import org.apache.atlas.notification.MessageDeserializer; import org.apache.atlas.notification.NotificationConsumer; +import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationInterface; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; import org.testng.annotations.Test; import java.util.ArrayList; @@ -30,6 +35,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -39,6 +46,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; public class KafkaNotificationTest { @@ -77,6 +85,83 @@ public class KafkaNotificationTest { assertTrue(consumers.contains(consumer2)); } + @Test + @SuppressWarnings("unchecked") + public void shouldSendMessagesSuccessfully() throws NotificationException, + ExecutionException, InterruptedException { + Properties configProperties = mock(Properties.class); + KafkaNotification kafkaNotification = new KafkaNotification(configProperties); + + Producer producer = mock(Producer.class); + String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK); + String message = "This is a test message"; + Future returnValue = mock(Future.class); + when(returnValue.get()).thenReturn(new RecordMetadata(new TopicPartition(topicName, 0), 0, 0)); + ProducerRecord expectedRecord = new ProducerRecord(topicName, message); + when(producer.send(expectedRecord)).thenReturn(returnValue); + + kafkaNotification.sendInternalToProducer(producer, + NotificationInterface.NotificationType.HOOK, new String[]{message}); + + verify(producer).send(expectedRecord); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldThrowExceptionIfProducerFails() throws NotificationException, + ExecutionException, InterruptedException { + Properties configProperties = mock(Properties.class); + KafkaNotification kafkaNotification = new KafkaNotification(configProperties); + + Producer producer = mock(Producer.class); + String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK); + String message = "This is a test message"; + Future returnValue = mock(Future.class); + when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception")); + ProducerRecord expectedRecord = new ProducerRecord(topicName, message); + when(producer.send(expectedRecord)).thenReturn(returnValue); + + try { + kafkaNotification.sendInternalToProducer(producer, + NotificationInterface.NotificationType.HOOK, new String[]{message}); + fail("Should have thrown NotificationException"); + } catch (NotificationException e) { + assertEquals(e.getFailedMessages().size(), 1); + assertEquals(e.getFailedMessages().get(0), "This is a test message"); + } + } + + @Test + @SuppressWarnings("unchecked") + public void shouldCollectAllFailedMessagesIfProducerFails() throws NotificationException, + ExecutionException, InterruptedException { + Properties configProperties = mock(Properties.class); + KafkaNotification kafkaNotification = new KafkaNotification(configProperties); + + Producer producer = mock(Producer.class); + String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK); + String message1 = "This is a test message1"; + String message2 = "This is a test message2"; + Future returnValue1 = mock(Future.class); + when(returnValue1.get()).thenThrow(new RuntimeException("Simulating exception")); + Future returnValue2 = mock(Future.class); + when(returnValue2.get()).thenThrow(new RuntimeException("Simulating exception")); + ProducerRecord expectedRecord1 = new ProducerRecord(topicName, message1); + when(producer.send(expectedRecord1)).thenReturn(returnValue1); + ProducerRecord expectedRecord2 = new ProducerRecord(topicName, message2); + when(producer.send(expectedRecord2)).thenReturn(returnValue1); + + try { + kafkaNotification.sendInternalToProducer(producer, + NotificationInterface.NotificationType.HOOK, new String[]{message1, message2}); + fail("Should have thrown NotificationException"); + } catch (NotificationException e) { + assertEquals(e.getFailedMessages().size(), 2); + assertEquals(e.getFailedMessages().get(0), "This is a test message1"); + assertEquals(e.getFailedMessages().get(1), "This is a test message2"); + } + } + class TestKafkaNotification extends KafkaNotification { private final ConsumerConnector consumerConnector; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index b243037..413493f 100644 --- a/release-log.txt +++ b/release-log.txt @@ -23,6 +23,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-901 Log messages that cannot be sent to Kafka to a specific log configuration (yhemanth) ATLAS-911 Get entity by unique attribute doesn't enforce type (shwethags) ATLAS-899 Fix Hive Hook documentation (sumasai via yhemanth) ATLAS-890 Log received messages in case of error (sumasai via yhemanth)
