Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 348d5fe19 -> bb2d8d9b4


ATLAS-2289: separate embedded kafka/zookeeper start/stop from KafkaNotification

(cherry picked from commit 3ee4f25355be771e70fe714a519c375f5e676a60)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/bb2d8d9b
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/bb2d8d9b
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/bb2d8d9b

Branch: refs/heads/branch-0.8
Commit: bb2d8d9b47fd48ab2415898035c2f882bf38d5a4
Parents: 348d5fe
Author: Madhan Neethiraj <[email protected]>
Authored: Tue Nov 28 15:03:44 2017 -0800
Committer: Madhan Neethiraj <[email protected]>
Committed: Thu Nov 30 21:14:29 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/atlas/service/Services.java |  18 +-
 .../apache/atlas/kafka/EmbeddedKafkaServer.java | 185 ++++++++++++++
 .../apache/atlas/kafka/KafkaNotification.java   | 252 +++++++------------
 .../notification/AbstractNotification.java      |  28 ---
 .../atlas/kafka/KafkaNotificationTest.java      |  37 ++-
 .../notification/EntityNotificationIT.java      |  10 +-
 .../NotificationHookConsumerIT.java             |   8 +-
 .../NotificationHookConsumerKafkaTest.java      |  99 ++++----
 .../atlas/web/integration/BaseResourceIT.java   |  33 ++-
 .../web/integration/EntityJerseyResourceIT.java |   2 -
 10 files changed, 404 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/bb2d8d9b/common/src/main/java/org/apache/atlas/service/Services.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/service/Services.java 
b/common/src/main/java/org/apache/atlas/service/Services.java
index 6f880e4..1267dc9 100644
--- a/common/src/main/java/org/apache/atlas/service/Services.java
+++ b/common/src/main/java/org/apache/atlas/service/Services.java
@@ -51,6 +51,7 @@ public class Services {
             try {
                 for (Service service : services) {
                     LOG.info("Starting service {}", 
service.getClass().getName());
+
                     service.start();
                 }
             } catch (Exception e) {
@@ -61,12 +62,17 @@ public class Services {
 
     @PreDestroy
     public void stop() {
-        for (Service service : services) {
-            LOG.info("Stopping service {}", service.getClass().getName());
-            try {
-                service.stop();
-            } catch (Throwable e) {
-                LOG.warn("Error stopping service {}", 
service.getClass().getName(), e);
+        if (configuration.getBoolean("atlas.services.enabled", true)) {
+            for (int idx = services.size() - 1; idx >= 0; idx--) {
+                Service service = services.get(idx);
+
+                LOG.info("Stopping service {}", service.getClass().getName());
+
+                try {
+                    service.stop();
+                } catch (Throwable e) {
+                    LOG.warn("Error stopping service {}", 
service.getClass().getName(), e);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/bb2d8d9b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java 
b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
new file mode 100644
index 0000000..33c8296
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.kafka;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.Time;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.service.Service;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.kafka.clients.producer.*;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+import scala.Option;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.*;
+
+
+@Component
+@Order(2)
+public class EmbeddedKafkaServer implements Service {
+    public static final Logger LOG = 
LoggerFactory.getLogger(EmbeddedKafkaServer.class);
+
+    public  static final String PROPERTY_PREFIX   = "atlas.kafka";
+    private static final String ATLAS_KAFKA_DATA  = "data";
+    public  static final String PROPERTY_EMBEDDED = 
"atlas.notification.embedded";
+
+    private final boolean           isEmbedded;
+    private final Properties        properties;
+    private       KafkaServer       kafkaServer;
+    private       ServerCnxnFactory factory;
+
+
+    @Inject
+    public EmbeddedKafkaServer(Configuration applicationProperties) throws 
AtlasException {
+        Configuration kafkaConf = 
ApplicationProperties.getSubsetConfiguration(applicationProperties, 
PROPERTY_PREFIX);
+
+        this.isEmbedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, 
false);
+        this.properties = ConfigurationConverter.getProperties(kafkaConf);
+    }
+
+    @Override
+    public void start() throws AtlasException {
+        LOG.info("==> EmbeddedKafkaServer.start(isEmbedded={})", isEmbedded);
+
+        if (isEmbedded) {
+            try {
+                startZk();
+                startKafka();
+            } catch (Exception e) {
+                throw new AtlasException("Failed to start embedded kafka", e);
+            }
+        } else {
+            LOG.info("==> EmbeddedKafkaServer.start(): not embedded..nothing 
todo");
+        }
+
+        LOG.info("<== EmbeddedKafkaServer.start(isEmbedded={})", isEmbedded);
+    }
+
+    @Override
+    public void stop() {
+        LOG.info("==> EmbeddedKafkaServer.stop(isEmbedded={})", isEmbedded);
+
+        if (kafkaServer != null) {
+            kafkaServer.shutdown();
+        }
+
+        if (factory != null) {
+            factory.shutdown();
+        }
+
+        LOG.info("<== EmbeddedKafka.stop(isEmbedded={})", isEmbedded);
+    }
+
+    private String startZk() throws IOException, InterruptedException, 
URISyntaxException {
+        String zkValue = properties.getProperty("zookeeper.connect");
+
+        LOG.info("Starting zookeeper at {}", zkValue);
+
+        URL zkAddress    = getURL(zkValue);
+        File snapshotDir = constructDir("zk/txn");
+        File logDir      = constructDir("zk/snap");
+
+        factory = NIOServerCnxnFactory.createFactory(new 
InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024);
+
+        factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
+
+        String ret = factory.getLocalAddress().getAddress().toString();
+
+        LOG.info("Embedded zookeeper for Kafka started at {}", ret);
+
+        return ret;
+    }
+
+    private void startKafka() throws IOException, URISyntaxException {
+        String kafkaValue = 
properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+        LOG.info("Starting kafka at {}", kafkaValue);
+
+        URL        kafkaAddress = getURL(kafkaValue);
+        Properties brokerConfig = properties;
+
+        brokerConfig.setProperty("broker.id", "1");
+        brokerConfig.setProperty("host.name", kafkaAddress.getHost());
+        brokerConfig.setProperty("port", 
String.valueOf(kafkaAddress.getPort()));
+        brokerConfig.setProperty("log.dirs", 
constructDir("kafka").getAbsolutePath());
+        brokerConfig.setProperty("log.flush.interval.messages", 
String.valueOf(1));
+
+        kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), new 
SystemTime(), Option.apply(this.getClass().getName()));
+
+        kafkaServer.startup();
+
+        LOG.info("Embedded kafka server started with broker config {}", 
brokerConfig);
+    }
+
+    private File constructDir(String dirPrefix) {
+        File file = new File(properties.getProperty(ATLAS_KAFKA_DATA), 
dirPrefix);
+
+        if (!file.exists() && !file.mkdirs()) {
+            throw new RuntimeException("could not create temp directory: " + 
file.getAbsolutePath());
+        }
+
+        return file;
+    }
+
+    private URL getURL(String url) throws MalformedURLException {
+        try {
+            return new URL(url);
+        } catch (MalformedURLException e) {
+            return new URL("http://"; + url);
+        }
+    }
+
+
+    // ----- inner class : SystemTime ----------------------------------------
+    private static class SystemTime implements Time {
+        @Override
+        public long milliseconds() {
+            return System.currentTimeMillis();
+        }
+
+        @Override
+        public long nanoseconds() {
+            return System.nanoTime();
+        }
+
+        @Override
+        public void sleep(long arg0) {
+            try {
+                Thread.sleep(arg0);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/bb2d8d9b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 6bb8d73..ecf9a07 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -18,9 +18,6 @@
 package org.apache.atlas.kafka;
 
 import com.google.common.annotations.VisibleForTesting;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.Time;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.notification.AbstractNotification;
@@ -37,28 +34,13 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
-import scala.Option;
 
 import javax.inject.Inject;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.ArrayList;
-import java.util.Properties;
+import java.util.*;
 import java.util.concurrent.Future;
 
 /**
@@ -69,22 +51,11 @@ import java.util.concurrent.Future;
 public class KafkaNotification extends AbstractNotification implements Service 
{
     public static final Logger LOG = 
LoggerFactory.getLogger(KafkaNotification.class);
 
-    public static final String PROPERTY_PREFIX = "atlas.kafka";
-
-    private static final String ATLAS_KAFKA_DATA = "data";
-
-    public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK";
-    public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES";
-
+    public    static final String PROPERTY_PREFIX            = "atlas.kafka";
+    public    static final String ATLAS_HOOK_TOPIC           = "ATLAS_HOOK";
+    public    static final String ATLAS_ENTITIES_TOPIC       = 
"ATLAS_ENTITIES";
     protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
 
-    private KafkaServer kafkaServer;
-    private ServerCnxnFactory factory;
-    private Properties properties;
-    private KafkaConsumer consumer = null;
-    private KafkaProducer producer = null;
-    private Long pollTimeOutMs = 1000L;
-
     private static final Map<NotificationType, String> TOPIC_MAP = new 
HashMap<NotificationType, String>() {
         {
             put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
@@ -92,10 +63,10 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
         }
     };
 
-    @VisibleForTesting
-    String getTopicName(NotificationType notificationType) {
-        return TOPIC_MAP.get(notificationType);
-    }
+    private final Properties    properties;
+    private final Long          pollTimeOutMs;
+    private       KafkaConsumer consumer;
+    private       KafkaProducer producer;
 
     // ----- Constructors ----------------------------------------------------
 
@@ -109,143 +80,159 @@ public class KafkaNotification extends 
AbstractNotification implements Service {
     @Inject
     public KafkaNotification(Configuration applicationProperties) throws 
AtlasException {
         super(applicationProperties);
-        Configuration subsetConfiguration =
-                
ApplicationProperties.getSubsetConfiguration(applicationProperties, 
PROPERTY_PREFIX);
-        properties = ConfigurationConverter.getProperties(subsetConfiguration);
-        //override to store offset in kafka
-        //todo do we need ability to replay?
+
+        LOG.info("==> KafkaNotification()");
+
+        Configuration kafkaConf = 
ApplicationProperties.getSubsetConfiguration(applicationProperties, 
PROPERTY_PREFIX);
+
+        properties    = ConfigurationConverter.getProperties(kafkaConf);
+        pollTimeOutMs = kafkaConf.getLong("poll.timeout.ms", 1000);
 
         //Override default configs
-        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.StringSerializer");
-        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.StringSerializer");
-        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.StringDeserializer");
-        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.StringDeserializer");
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
-        pollTimeOutMs = subsetConfiguration.getLong("poll.timeout.ms", 1000);
-        boolean oldApiCommitEnbleFlag = 
subsetConfiguration.getBoolean("auto.commit.enable",false);
+        boolean oldApiCommitEnableFlag = 
kafkaConf.getBoolean("auto.commit.enable", false);
+
         //set old autocommit value if new autoCommit property is not set.
-        properties.put("enable.auto.commit", 
subsetConfiguration.getBoolean("enable.auto.commit", oldApiCommitEnbleFlag));
-        properties.put("session.timeout.ms", 
subsetConfiguration.getString("session.timeout.ms", "30000"));
+        properties.put("enable.auto.commit", 
kafkaConf.getBoolean("enable.auto.commit", oldApiCommitEnableFlag));
+        properties.put("session.timeout.ms", 
kafkaConf.getString("session.timeout.ms", "30000"));
 
+        LOG.info("<== KafkaNotification()");
     }
 
     @VisibleForTesting
     protected KafkaNotification(Properties properties) {
-        this.properties = properties;
+        super();
+
+        LOG.info("==> KafkaNotification()");
+
+        this.properties    = properties;
+        this.pollTimeOutMs = 1000L;
+
+        LOG.info("<== KafkaNotification()");
+    }
+
+    @VisibleForTesting
+    String getTopicName(NotificationType notificationType) {
+        return TOPIC_MAP.get(notificationType);
     }
 
     // ----- Service ---------------------------------------------------------
 
     @Override
     public void start() throws AtlasException {
-        if (isHAEnabled()) {
-            LOG.info("Not starting embedded instances when HA is enabled.");
-            return;
-        }
-        if (isEmbedded()) {
-            try {
-                startZk();
-                startKafka();
-            } catch (Exception e) {
-                throw new AtlasException("Failed to start embedded kafka", e);
-            }
-        }
+        LOG.info("==> KafkaNotification.start()");
+
+        LOG.info("<== KafkaNotification.start()");
     }
 
     @Override
     public void stop() {
-        if (kafkaServer != null) {
-            kafkaServer.shutdown();
-        }
+        LOG.info("==> KafkaNotification.stop()");
 
-        if (factory != null) {
-            factory.shutdown();
-        }
+        LOG.info("<== KafkaNotification.stop()");
     }
 
 
     // ----- NotificationInterface -------------------------------------------
-
     @Override
-    public <T> List<NotificationConsumer<T>> createConsumers(NotificationType 
notificationType,
-                                                             int numConsumers) 
{
-        return createConsumers(notificationType, numConsumers,
-                Boolean.valueOf(properties.getProperty("enable.auto.commit", 
properties.getProperty("auto.commit.enable","false"))));
+    public <T> List<NotificationConsumer<T>> createConsumers(NotificationType 
notificationType, int numConsumers) {
+        return createConsumers(notificationType, numConsumers, 
Boolean.valueOf(properties.getProperty("enable.auto.commit", 
properties.getProperty("auto.commit.enable","false"))));
     }
 
     @VisibleForTesting
-    public <T> List<NotificationConsumer<T>> createConsumers(NotificationType 
notificationType,
-                                                      int numConsumers, 
boolean autoCommitEnabled) {
+    public <T> List<NotificationConsumer<T>> createConsumers(NotificationType 
notificationType, int numConsumers, boolean autoCommitEnabled) {
+        LOG.info("==> KafkaNotification.createConsumers(notificationType={}, 
numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, 
autoCommitEnabled);
 
         Properties consumerProperties = 
getConsumerProperties(notificationType);
-
         List<NotificationConsumer<T>> consumers = new ArrayList<>();
         AtlasKafkaConsumer kafkaConsumer = new 
AtlasKafkaConsumer(notificationType.getDeserializer(), 
getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), 
autoCommitEnabled, pollTimeOutMs );
         consumers.add(kafkaConsumer);
+
+        LOG.info("<== KafkaNotification.createConsumers(notificationType={}, 
numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, 
autoCommitEnabled);
+
         return consumers;
     }
 
     @Override
     public void close() {
+        LOG.info("==> KafkaNotification.close()");
+
         if (producer != null) {
             producer.close();
+
             producer = null;
         }
+
+        LOG.info("<== KafkaNotification.close()");
     }
 
 
     // ----- AbstractNotification --------------------------------------------
-
     @Override
     public void sendInternal(NotificationType type, List<String> messages) 
throws NotificationException {
         if (producer == null) {
             createProducer();
         }
+
         sendInternalToProducer(producer, type, messages);
     }
 
     @VisibleForTesting
     void sendInternalToProducer(Producer p, NotificationType type, 
List<String> messages) throws NotificationException {
-        String topic = TOPIC_MAP.get(type);
+        String               topic           = TOPIC_MAP.get(type);
         List<MessageContext> messageContexts = new ArrayList<>();
+
         for (String message : messages) {
             ProducerRecord record = new ProducerRecord(topic, message);
-            LOG.debug("Sending message for topic {}: {}", topic, message);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Sending message for topic {}: {}", topic, message);
+            }
+
             Future future = p.send(record);
+
             messageContexts.add(new MessageContext(future, message));
         }
 
-        List<String> failedMessages = new ArrayList<>();
-        Exception lastFailureException = null;
+        List<String> failedMessages       = new ArrayList<>();
+        Exception    lastFailureException = null;
+
         for (MessageContext context : messageContexts) {
             try {
                 RecordMetadata response = context.getFuture().get();
-                LOG.debug("Sent message for topic - {}, partition - {}, offset 
- {}", response.topic(),
-                    response.partition(), response.offset());
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Sent message for topic - {}, partition - {}, 
offset - {}", response.topic(), response.partition(), response.offset());
+                }
             } catch (Exception e) {
                 lastFailureException = e;
+
                 failedMessages.add(context.getMessage());
             }
         }
+
         if (lastFailureException != null) {
             throw new NotificationException(lastFailureException, 
failedMessages);
         }
     }
 
 
-    public KafkaConsumer  getKafkaConsumer(Properties consumerProperties, 
NotificationType type, boolean autoCommitEnabled) {
+    public KafkaConsumer getKafkaConsumer(Properties consumerProperties, 
NotificationType type, boolean autoCommitEnabled) {
         if(this.consumer == null) {
             try {
                 String topic = TOPIC_MAP.get(type);
+
                 consumerProperties.put("enable.auto.commit", 
autoCommitEnabled);
+
                 this.consumer = new KafkaConsumer(consumerProperties);
+
                 this.consumer.subscribe(Arrays.asList(topic));
-            }catch (Exception ee) {
+            } catch (Exception ee) {
                 LOG.error("Exception in getKafkaConsumer ", ee);
             }
         }
@@ -254,110 +241,39 @@ public class KafkaNotification extends 
AbstractNotification implements Service {
     }
 
 
-
-
     // Get properties for consumer request
     private Properties getConsumerProperties(NotificationType type) {
         // find the configured group id for the given notification type
-
         String groupId = properties.getProperty(type.toString().toLowerCase() 
+ "." + CONSUMER_GROUP_ID_PROPERTY);
+
         if (StringUtils.isEmpty(groupId)) {
             throw new IllegalStateException("No configuration group id set for 
the notification type " + type);
         }
 
         Properties consumerProperties = new Properties();
+
         consumerProperties.putAll(properties);
         consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
 
-        LOG.info("Consumer property: atlas.kafka.enable.auto.commit: {}", 
consumerProperties.getProperty("enable.auto.commit"));
         return consumerProperties;
     }
 
-    private File constructDir(String dirPrefix) {
-        File file = new File(properties.getProperty(ATLAS_KAFKA_DATA), 
dirPrefix);
-        if (!file.exists() && !file.mkdirs()) {
-            throw new RuntimeException("could not create temp directory: " + 
file.getAbsolutePath());
-        }
-        return file;
-    }
-
     private synchronized void createProducer() {
+        LOG.info("==> KafkaNotification.createProducer()");
+
         if (producer == null) {
             producer = new KafkaProducer(properties);
         }
-    }
 
-    private URL getURL(String url) throws MalformedURLException {
-        try {
-            return new URL(url);
-        } catch (MalformedURLException e) {
-            return new URL("http://"; + url);
-        }
-    }
-
-    private String startZk() throws IOException, InterruptedException, 
URISyntaxException {
-        String zkValue = properties.getProperty("zookeeper.connect");
-        LOG.debug("Starting zookeeper at {}", zkValue);
-
-        URL zkAddress = getURL(zkValue);
-        this.factory = NIOServerCnxnFactory.createFactory(
-                new InetSocketAddress(zkAddress.getHost(), 
zkAddress.getPort()), 1024);
-        File snapshotDir = constructDir("zk/txn");
-        File logDir = constructDir("zk/snap");
-
-        factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
-        return factory.getLocalAddress().getAddress().toString();
-    }
-
-    private void startKafka() throws IOException, URISyntaxException {
-        String kafkaValue = 
properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
-        LOG.debug("Starting kafka at {}", kafkaValue);
-        URL kafkaAddress = getURL(kafkaValue);
-
-        Properties brokerConfig = properties;
-        brokerConfig.setProperty("broker.id", "1");
-        brokerConfig.setProperty("host.name", kafkaAddress.getHost());
-        brokerConfig.setProperty("port", 
String.valueOf(kafkaAddress.getPort()));
-        brokerConfig.setProperty("log.dirs", 
constructDir("kafka").getAbsolutePath());
-        brokerConfig.setProperty("log.flush.interval.messages", 
String.valueOf(1));
-
-        kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), new 
SystemTime(),
-                Option.apply(this.getClass().getName()));
-        kafkaServer.startup();
-        LOG.debug("Embedded kafka server started with broker config {}", 
brokerConfig);
-    }
-
-
-    // ----- inner class : SystemTime ----------------------------------------
-
-    private static class SystemTime implements Time {
-        @Override
-        public long milliseconds() {
-            return System.currentTimeMillis();
-        }
-
-        @Override
-        public long nanoseconds() {
-            return System.nanoTime();
-        }
-
-        @Override
-        public void sleep(long arg0) {
-            try {
-                Thread.sleep(arg0);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
+        LOG.info("<== KafkaNotification.createProducer()");
     }
 
     private class MessageContext {
-
         private final Future<RecordMetadata> future;
-        private final String message;
+        private final String                 message;
 
         public MessageContext(Future<RecordMetadata> future, String message) {
-            this.future = future;
+            this.future  = future;
             this.message = message;
         }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/bb2d8d9b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
 
b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index 3e5880a..7a3bfe4 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -25,7 +25,6 @@ import com.google.gson.JsonParser;
 import com.google.gson.JsonSerializationContext;
 import com.google.gson.JsonSerializer;
 import org.apache.atlas.AtlasException;
-import org.apache.atlas.ha.HAConfiguration;
 import 
org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.atlas.typesystem.IStruct;
@@ -63,8 +62,6 @@ public abstract class AbstractNotification implements 
NotificationInterface {
      */
     public static final MessageVersion CURRENT_MESSAGE_VERSION = new 
MessageVersion("1.0.0");
 
-    public static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + 
".embedded";
-
     public static final int MAX_BYTES_PER_CHAR = 4;  // each char can encode 
upto 4 bytes in UTF-8
 
     /**
@@ -77,8 +74,6 @@ public abstract class AbstractNotification implements 
NotificationInterface {
      */
     private static String currentUser = "";
 
-    private final boolean embedded;
-    private final boolean isHAEnabled;
 
     /**
      * Used for message serialization.
@@ -93,14 +88,10 @@ public abstract class AbstractNotification implements 
NotificationInterface {
     // ----- Constructors ----------------------------------------------------
 
     public AbstractNotification(Configuration applicationProperties) throws 
AtlasException {
-        this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, 
false);
-        this.isHAEnabled = HAConfiguration.isHAEnabled(applicationProperties);
     }
 
     @VisibleForTesting
     protected AbstractNotification() {
-        embedded = false;
-        isHAEnabled = false;
     }
 
     // ----- NotificationInterface -------------------------------------------
@@ -127,25 +118,6 @@ public abstract class AbstractNotification implements 
NotificationInterface {
     }
 
     // ----- AbstractNotification --------------------------------------------
-
-    /**
-     * Determine whether or not the notification service embedded in Atlas 
server.
-     *
-     * @return true if the the notification service embedded in Atlas server.
-     */
-    protected final boolean isEmbedded() {
-        return embedded;
-    }
-
-    /**
-     * Determine whether or not the high availability feature is enabled.
-     *
-     * @return true if the high availability feature is enabled.
-     */
-    protected final boolean isHAEnabled() {
-        return isHAEnabled;
-    }
-
     /**
      * Send the given messages.
      *

http://git-wip-us.apache.org/repos/asf/atlas/blob/bb2d8d9b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index a1e13b9..11f4097 100644
--- 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -33,25 +33,19 @@ import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificati
 import java.util.List;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
 
 public class KafkaNotificationTest {
-
+    private EmbeddedKafkaServer kafkaServer;
     private KafkaNotification kafkaNotification;
 
     @BeforeClass
     public void setup() throws Exception {
-        Configuration properties = ApplicationProperties.get();
-        properties.setProperty("atlas.kafka.data", "target/" + 
RandomStringUtils.randomAlphanumeric(5));
-
-        kafkaNotification = new KafkaNotification(properties);
-        kafkaNotification.start();
+        initNotificationService();
     }
 
     @AfterClass
     public void shutdown() throws Exception {
-        kafkaNotification.close();
-        kafkaNotification.stop();
+        cleanUpNotificationService();
     }
 
     @Test
@@ -84,4 +78,29 @@ public class KafkaNotificationTest {
 
         consumer.close();
     }
+
+    void initNotificationService() throws Exception {
+        Configuration applicationProperties = ApplicationProperties.get();
+
+        applicationProperties.setProperty("atlas.kafka.data", "target/" + 
RandomStringUtils.randomAlphanumeric(5));
+
+        kafkaServer       = new EmbeddedKafkaServer(applicationProperties);
+        kafkaNotification = new KafkaNotification(applicationProperties);
+
+        kafkaServer.start();
+        kafkaNotification.start();
+
+        Thread.sleep(2000);
+    }
+
+    void cleanUpNotificationService() throws Exception {
+        if (kafkaNotification != null) {
+            kafkaNotification.close();
+            kafkaNotification.stop();
+        }
+
+        if (kafkaServer != null) {
+            kafkaServer.stop();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/bb2d8d9b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java 
b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
index 7e94330..421720a 100644
--- 
a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
@@ -33,6 +33,7 @@ import 
org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
 import org.apache.atlas.typesystem.types.TraitType;
 import org.apache.atlas.typesystem.types.utils.TypesUtil;
 import org.apache.atlas.web.integration.BaseResourceIT;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -46,7 +47,6 @@ import static org.testng.Assert.assertTrue;
  * Entity Notification Integration Tests.
  */
 public class EntityNotificationIT extends BaseResourceIT {
-
     private final String DATABASE_NAME = "db" + randomString();
     private final String TABLE_NAME = "table" + randomString();
     private NotificationInterface notificationInterface = 
NotificationProvider.get();
@@ -58,6 +58,9 @@ public class EntityNotificationIT extends BaseResourceIT {
     @BeforeClass
     public void setUp() throws Exception {
         super.setUp();
+
+        initNotificationService();
+
         createTypeDefinitionsV1();
         Referenceable HiveDBInstance = 
createHiveDBInstanceBuiltIn(DATABASE_NAME);
         dbId = createInstance(HiveDBInstance);
@@ -65,6 +68,11 @@ public class EntityNotificationIT extends BaseResourceIT {
         notificationConsumer = 
notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES,
 1).get(0);
     }
 
+    @AfterClass
+    public void teardown() throws Exception {
+        cleanUpNotificationService();
+    }
+
     public void testCreateEntity() throws Exception {
         Referenceable tableInstance = 
createHiveTableInstanceBuiltIn(DATABASE_NAME, TABLE_NAME, dbId);
         tableId = createInstance(tableInstance);

http://git-wip-us.apache.org/repos/asf/atlas/blob/bb2d8d9b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
index d41db3e..953e353 100644
--- 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
@@ -19,7 +19,6 @@
 package org.apache.atlas.notification;
 
 import org.apache.atlas.EntityAuditEvent;
-import org.apache.atlas.kafka.NotificationProvider;
 import org.apache.atlas.notification.hook.HookNotification;
 import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
 import org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
@@ -47,17 +46,18 @@ public class NotificationHookConsumerIT extends 
BaseResourceIT {
     public static final String QUALIFIED_NAME = "qualifiedName";
     public static final String CLUSTER_NAME = "clusterName";
 
-    private NotificationInterface notificationInterface = 
NotificationProvider.get();
-
     @BeforeClass
     public void setUp() throws Exception {
         super.setUp();
+
+        initNotificationService();
+
         createTypeDefinitionsV1();
     }
 
     @AfterClass
     public void teardown() throws Exception {
-        notificationInterface.close();
+        cleanUpNotificationService();
     }
 
     private void sendHookMessage(HookNotificationMessage message) throws 
NotificationException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/atlas/blob/bb2d8d9b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index eb37fa8..e7a400e 100644
--- 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -24,9 +24,9 @@ import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.kafka.KafkaNotification;
-import org.apache.atlas.kafka.NotificationProvider;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.kafka.*;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.v1.EntityStream;
@@ -44,7 +44,6 @@ import org.testng.annotations.Test;
 import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
 import java.util.List;
 
-import org.apache.atlas.kafka.AtlasKafkaConsumer;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
@@ -61,7 +60,9 @@ public class NotificationHookConsumerKafkaTest {
     public static final String NAME = "name";
     public static final String DESCRIPTION = "description";
     public static final String QUALIFIED_NAME = "qualifiedName";
-    private NotificationInterface notificationInterface = 
NotificationProvider.get();
+    private NotificationInterface notificationInterface = null;
+    private EmbeddedKafkaServer   kafkaServer           = null;
+    private KafkaNotification     kafkaNotification     = null;
 
 
     @Mock
@@ -76,8 +77,6 @@ public class NotificationHookConsumerKafkaTest {
     @Mock
     private AtlasTypeRegistry typeRegistry;
 
-    private KafkaNotification kafkaNotification;
-
     @BeforeTest
     public void setup() throws AtlasException, InterruptedException, 
AtlasBaseException {
         MockitoAnnotations.initMocks(this);
@@ -85,65 +84,54 @@ public class NotificationHookConsumerKafkaTest {
         when(typeRegistry.getType(anyString())).thenReturn(mockType);
         AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = 
mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
         
when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
-        kafkaNotification = startKafkaServer();
+
+        initNotificationService();
     }
 
     @AfterTest
     public void shutdown() {
-        kafkaNotification.close();
-        kafkaNotification.stop();
+        cleanUpNotificationService();
     }
 
     @Test
     public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws 
AtlasException, InterruptedException, AtlasBaseException {
-        try {
-            produceMessage(new 
HookNotification.EntityCreateRequest("test_user1", createEntity()));
+        produceMessage(new HookNotification.EntityCreateRequest("test_user1", 
createEntity()));
 
-            NotificationConsumer<HookNotificationMessage> consumer = 
createNewConsumer(kafkaNotification, false);
-            NotificationHookConsumer notificationHookConsumer =
-                    new NotificationHookConsumer(notificationInterface, 
atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-            NotificationHookConsumer.HookConsumer hookConsumer = 
notificationHookConsumer.new HookConsumer(consumer);
+        NotificationConsumer<HookNotificationMessage> consumer = 
createNewConsumer(kafkaNotification, false);
+        NotificationHookConsumer notificationHookConsumer =
+                new NotificationHookConsumer(notificationInterface, 
atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+        NotificationHookConsumer.HookConsumer hookConsumer = 
notificationHookConsumer.new HookConsumer(consumer);
 
-            consumeOneMessage(consumer, hookConsumer);
-            verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), 
anyBoolean());
+        consumeOneMessage(consumer, hookConsumer);
+        verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), 
anyBoolean());
 
-            // produce another message, and make sure it moves ahead. If 
commit succeeded, this would work.
-            produceMessage(new 
HookNotification.EntityCreateRequest("test_user2", createEntity()));
-            consumeOneMessage(consumer, hookConsumer);
-            
verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), 
anyBoolean());
-            reset(atlasEntityStore);
-        }
-        finally {
-            kafkaNotification.close();
-        }
+        // produce another message, and make sure it moves ahead. If commit 
succeeded, this would work.
+        produceMessage(new HookNotification.EntityCreateRequest("test_user2", 
createEntity()));
+        consumeOneMessage(consumer, hookConsumer);
+        
verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), 
anyBoolean());
+        reset(atlasEntityStore);
     }
 
     @Test(dependsOnMethods = 
"testConsumerConsumesNewMessageWithAutoCommitDisabled")
     public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws 
Exception {
-        try {
-            produceMessage(new 
HookNotification.EntityCreateRequest("test_user3", createEntity()));
-
-            NotificationConsumer<HookNotificationMessage> consumer = 
createNewConsumer(kafkaNotification, true);
+        produceMessage(new HookNotification.EntityCreateRequest("test_user3", 
createEntity()));
 
-            assertNotNull (consumer);
+        NotificationConsumer<HookNotificationMessage> consumer = 
createNewConsumer(kafkaNotification, true);
 
-            NotificationHookConsumer notificationHookConsumer =
-                    new NotificationHookConsumer(notificationInterface, 
atlasEntityStore, serviceState, instanceConverter, typeRegistry);
-            NotificationHookConsumer.HookConsumer hookConsumer = 
notificationHookConsumer.new HookConsumer(consumer);
+        assertNotNull (consumer);
 
+        NotificationHookConsumer notificationHookConsumer =
+                new NotificationHookConsumer(notificationInterface, 
atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+        NotificationHookConsumer.HookConsumer hookConsumer = 
notificationHookConsumer.new HookConsumer(consumer);
 
-            consumeOneMessage(consumer, hookConsumer);
-            verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), 
anyBoolean());
+        consumeOneMessage(consumer, hookConsumer);
+        verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), 
anyBoolean());
 
-            // produce another message, but this will not be consumed, as 
commit code is not executed in hook consumer.
-            produceMessage(new 
HookNotification.EntityCreateRequest("test_user4", createEntity()));
+        // produce another message, but this will not be consumed, as commit 
code is not executed in hook consumer.
+        produceMessage(new HookNotification.EntityCreateRequest("test_user4", 
createEntity()));
 
-            consumeOneMessage(consumer, hookConsumer);
-            
verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), 
anyBoolean());
-        }
-        finally {
-            kafkaNotification.close();
-        }
+        consumeOneMessage(consumer, hookConsumer);
+        
verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), 
anyBoolean());
     }
 
     AtlasKafkaConsumer<HookNotificationMessage> 
createNewConsumer(KafkaNotification kafkaNotification, boolean 
autoCommitEnabled) {
@@ -178,22 +166,35 @@ public class NotificationHookConsumerKafkaTest {
         return entity;
     }
 
-    KafkaNotification startKafkaServer() throws AtlasException, 
InterruptedException {
+    protected String randomString() {
+        return RandomStringUtils.randomAlphanumeric(10);
+    }
+
+    private void produceMessage(HookNotification message) throws 
NotificationException {
+        kafkaNotification.send(NotificationInterface.NotificationType.HOOK, 
message);
+    }
+
+    void initNotificationService() throws AtlasException, InterruptedException 
{
         Configuration applicationProperties = ApplicationProperties.get();
         applicationProperties.setProperty("atlas.kafka.data", "target/" + 
RandomStringUtils.randomAlphanumeric(5));
 
-        kafkaNotification = new KafkaNotification(applicationProperties);
+        kafkaServer           = new EmbeddedKafkaServer(applicationProperties);
+        kafkaNotification     = new KafkaNotification(applicationProperties);
+        notificationInterface = kafkaNotification;
+
+        kafkaServer.start();
         kafkaNotification.start();
         Thread.sleep(2000);
-        return kafkaNotification;
     }
 
-    protected String randomString() {
-        return RandomStringUtils.randomAlphanumeric(10);
+    void cleanUpNotificationService() {
+        if (kafkaNotification != null) {
+            kafkaNotification.close();
+            kafkaNotification.stop();
+        }
     }
 
     private void produceMessage(HookNotificationMessage message) throws 
NotificationException {
         kafkaNotification.send(NotificationInterface.NotificationType.HOOK, 
message);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/bb2d8d9b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java 
b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
index 512750f..3ee67b3 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
@@ -40,10 +40,10 @@ import 
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import 
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.kafka.*;
 import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
@@ -102,6 +102,11 @@ public abstract class BaseResourceIT {
     protected static final int MAX_WAIT_TIME = 60000;
     protected String[] atlasUrls;
 
+
+    protected NotificationInterface notificationInterface = null;
+    protected EmbeddedKafkaServer   kafkaServer           = null;
+    protected KafkaNotification     kafkaNotification     = null;
+
     @BeforeClass
     public void setUp() throws Exception {
 
@@ -681,4 +686,30 @@ public abstract class BaseResourceIT {
     protected JSONArray searchByDSL(String dslQuery) throws 
AtlasServiceException {
         return atlasClientV1.searchByDSL(dslQuery, 10, 0);
     }
+
+    protected void initNotificationService() throws Exception {
+        Configuration applicationProperties = ApplicationProperties.get();
+
+        applicationProperties.setProperty("atlas.kafka.data", "target/" + 
RandomStringUtils.randomAlphanumeric(5));
+
+        kafkaServer           = new EmbeddedKafkaServer(applicationProperties);
+        kafkaNotification     = new KafkaNotification(applicationProperties);
+        notificationInterface = kafkaNotification;
+
+        kafkaServer.start();
+        kafkaNotification.start();
+
+        Thread.sleep(2000);
+    }
+
+    protected void cleanUpNotificationService() {
+        if (kafkaNotification != null) {
+            kafkaNotification.close();
+            kafkaNotification.stop();
+        }
+
+        if (kafkaServer != null) {
+            kafkaServer.stop();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/bb2d8d9b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
 
b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
index 065a44d..f2a7801 100755
--- 
a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
+++ 
b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java
@@ -78,8 +78,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
 
     private static final String TRAITS = "traits";
 
-    private NotificationInterface notificationInterface = 
NotificationProvider.get();
-
     @BeforeClass
     public void setUp() throws Exception {
         super.setUp();

Reply via email to