ATLAS-1908: updated NotificationConsumer with a method removed earlier to avoid breaking existing usage (like in Ranger)
Change-Id: Ib8a7f338da7fd0f710fc683da87871e3d9c32035 Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/bcabde9b Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/bcabde9b Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/bcabde9b Branch: refs/heads/feature-odf Commit: bcabde9bbe8361bc7b4461b395dd2ffcb0906962 Parents: eddab3b Author: nixonrodrigues <[email protected]> Authored: Sun Jul 16 00:14:29 2017 +0530 Committer: Madhan Neethiraj <[email protected]> Committed: Sat Jul 15 14:50:04 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/atlas/kafka/AtlasKafkaConsumer.java | 9 ++++++++- .../org/apache/atlas/notification/NotificationConsumer.java | 9 +++++++++ .../notification/AbstractNotificationConsumerTest.java | 5 +++++ 3 files changed, 22 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/bcabde9b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java index 52d0916..d431176 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java @@ -52,9 +52,15 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { } public List<AtlasKafkaMessage<T>> receive() { + return this.receive(this.pollTimeoutMilliSeconds); + } + + @Override + public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) { + List<AtlasKafkaMessage<T>> messages = new ArrayList(); - ConsumerRecords<?, ?> records = kafkaConsumer.poll(pollTimeoutMilliSeconds); + ConsumerRecords<?, ?> records = kafkaConsumer.poll(timeoutMilliSeconds); if (records != null) { for (ConsumerRecord<?, ?> record : records) { @@ -70,6 +76,7 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { } return messages; + } http://git-wip-us.apache.org/repos/asf/atlas/blob/bcabde9b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java index 6d1c08a..0bd75e1 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java @@ -44,4 +44,13 @@ public interface NotificationConsumer<T> { * @return List containing kafka message and partionId and offset. */ List<AtlasKafkaMessage<T>> receive(); + + /** + * Fetch data for the topics from Kafka + * @param timeoutMilliSeconds poll timeout + * @return List containing kafka message and partionId and offset. + */ + List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds); + + } http://git-wip-us.apache.org/repos/asf/atlas/blob/bcabde9b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java index 68fe3d7..bcee00c 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java @@ -204,6 +204,11 @@ public class AbstractNotificationConsumerTest { @Override public List<AtlasKafkaMessage<T>> receive() { + return receive(1000L); + } + + @Override + public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) { List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList(); for(Object json : messageList) { tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1));
