Repository: atlas Updated Branches: refs/heads/master 3205ca4af -> 3ee4f2535
ATLAS-2289: separate embedded kafka/zookeeper start/stop from KafkaNotification Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/3ee4f253 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/3ee4f253 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/3ee4f253 Branch: refs/heads/master Commit: 3ee4f25355be771e70fe714a519c375f5e676a60 Parents: 3205ca4 Author: Madhan Neethiraj <[email protected]> Authored: Tue Nov 28 15:03:44 2017 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Thu Nov 30 19:26:33 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/atlas/service/Services.java | 18 +- .../apache/atlas/kafka/EmbeddedKafkaServer.java | 185 +++++++++++++ .../apache/atlas/kafka/KafkaNotification.java | 257 +++++++------------ .../notification/AbstractNotification.java | 60 ----- .../atlas/kafka/KafkaNotificationTest.java | 36 ++- .../notification/EntityNotificationIT.java | 9 +- .../NotificationHookConsumerIT.java | 7 +- .../NotificationHookConsumerKafkaTest.java | 101 ++++---- .../atlas/web/integration/BaseResourceIT.java | 32 +++ .../web/integration/EntityJerseyResourceIT.java | 2 - 10 files changed, 404 insertions(+), 303 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/3ee4f253/common/src/main/java/org/apache/atlas/service/Services.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/service/Services.java b/common/src/main/java/org/apache/atlas/service/Services.java index 6f880e4..1267dc9 100644 --- a/common/src/main/java/org/apache/atlas/service/Services.java +++ b/common/src/main/java/org/apache/atlas/service/Services.java @@ -51,6 +51,7 @@ public class Services { try { for (Service service : services) { LOG.info("Starting service {}", service.getClass().getName()); + service.start(); } } catch (Exception e) { @@ -61,12 +62,17 @@ public class Services { @PreDestroy public void stop() { - for (Service service : services) { - LOG.info("Stopping service {}", service.getClass().getName()); - try { - service.stop(); - } catch (Throwable e) { - LOG.warn("Error stopping service {}", service.getClass().getName(), e); + if (configuration.getBoolean("atlas.services.enabled", true)) { + for (int idx = services.size() - 1; idx >= 0; idx--) { + Service service = services.get(idx); + + LOG.info("Stopping service {}", service.getClass().getName()); + + try { + service.stop(); + } catch (Throwable e) { + LOG.warn("Error stopping service {}", service.getClass().getName(), e); + } } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/3ee4f253/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java new file mode 100644 index 0000000..33c8296 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.kafka; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.Time; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.service.Service; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationConverter; +import org.apache.kafka.clients.producer.*; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; +import scala.Option; + +import javax.inject.Inject; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.*; + + +@Component +@Order(2) +public class EmbeddedKafkaServer implements Service { + public static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaServer.class); + + public static final String PROPERTY_PREFIX = "atlas.kafka"; + private static final String ATLAS_KAFKA_DATA = "data"; + public static final String PROPERTY_EMBEDDED = "atlas.notification.embedded"; + + private final boolean isEmbedded; + private final Properties properties; + private KafkaServer kafkaServer; + private ServerCnxnFactory factory; + + + @Inject + public EmbeddedKafkaServer(Configuration applicationProperties) throws AtlasException { + Configuration kafkaConf = ApplicationProperties.getSubsetConfiguration(applicationProperties, PROPERTY_PREFIX); + + this.isEmbedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false); + this.properties = ConfigurationConverter.getProperties(kafkaConf); + } + + @Override + public void start() throws AtlasException { + LOG.info("==> EmbeddedKafkaServer.start(isEmbedded={})", isEmbedded); + + if (isEmbedded) { + try { + startZk(); + startKafka(); + } catch (Exception e) { + throw new AtlasException("Failed to start embedded kafka", e); + } + } else { + LOG.info("==> EmbeddedKafkaServer.start(): not embedded..nothing todo"); + } + + LOG.info("<== EmbeddedKafkaServer.start(isEmbedded={})", isEmbedded); + } + + @Override + public void stop() { + LOG.info("==> EmbeddedKafkaServer.stop(isEmbedded={})", isEmbedded); + + if (kafkaServer != null) { + kafkaServer.shutdown(); + } + + if (factory != null) { + factory.shutdown(); + } + + LOG.info("<== EmbeddedKafka.stop(isEmbedded={})", isEmbedded); + } + + private String startZk() throws IOException, InterruptedException, URISyntaxException { + String zkValue = properties.getProperty("zookeeper.connect"); + + LOG.info("Starting zookeeper at {}", zkValue); + + URL zkAddress = getURL(zkValue); + File snapshotDir = constructDir("zk/txn"); + File logDir = constructDir("zk/snap"); + + factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024); + + factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500)); + + String ret = factory.getLocalAddress().getAddress().toString(); + + LOG.info("Embedded zookeeper for Kafka started at {}", ret); + + return ret; + } + + private void startKafka() throws IOException, URISyntaxException { + String kafkaValue = properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); + + LOG.info("Starting kafka at {}", kafkaValue); + + URL kafkaAddress = getURL(kafkaValue); + Properties brokerConfig = properties; + + brokerConfig.setProperty("broker.id", "1"); + brokerConfig.setProperty("host.name", kafkaAddress.getHost()); + brokerConfig.setProperty("port", String.valueOf(kafkaAddress.getPort())); + brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath()); + brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1)); + + kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), new SystemTime(), Option.apply(this.getClass().getName())); + + kafkaServer.startup(); + + LOG.info("Embedded kafka server started with broker config {}", brokerConfig); + } + + private File constructDir(String dirPrefix) { + File file = new File(properties.getProperty(ATLAS_KAFKA_DATA), dirPrefix); + + if (!file.exists() && !file.mkdirs()) { + throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath()); + } + + return file; + } + + private URL getURL(String url) throws MalformedURLException { + try { + return new URL(url); + } catch (MalformedURLException e) { + return new URL("http://" + url); + } + } + + + // ----- inner class : SystemTime ---------------------------------------- + private static class SystemTime implements Time { + @Override + public long milliseconds() { + return System.currentTimeMillis(); + } + + @Override + public long nanoseconds() { + return System.nanoTime(); + } + + @Override + public void sleep(long arg0) { + try { + Thread.sleep(arg0); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/3ee4f253/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 152144b..80dc514 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -18,9 +18,6 @@ package org.apache.atlas.kafka; import com.google.common.annotations.VisibleForTesting; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.Time; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.notification.AbstractNotification; @@ -37,28 +34,13 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.zookeeper.server.NIOServerCnxnFactory; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.ZooKeeperServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; -import scala.Option; import javax.inject.Inject; -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.MalformedURLException; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.HashMap; -import java.util.ArrayList; -import java.util.Properties; +import java.util.*; import java.util.concurrent.Future; /** @@ -69,22 +51,11 @@ import java.util.concurrent.Future; public class KafkaNotification extends AbstractNotification implements Service { public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class); - public static final String PROPERTY_PREFIX = "atlas.kafka"; - - private static final String ATLAS_KAFKA_DATA = "data"; - - public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK"; - public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES"; - + public static final String PROPERTY_PREFIX = "atlas.kafka"; + public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK"; + public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES"; protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id"; - private KafkaServer kafkaServer; - private ServerCnxnFactory factory; - private Properties properties; - private KafkaConsumer consumer = null; - private KafkaProducer producer = null; - private Long pollTimeOutMs = 1000L; - private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() { { put(NotificationType.HOOK, ATLAS_HOOK_TOPIC); @@ -92,10 +63,10 @@ public class KafkaNotification extends AbstractNotification implements Service { } }; - @VisibleForTesting - String getTopicName(NotificationType notificationType) { - return TOPIC_MAP.get(notificationType); - } + private final Properties properties; + private final Long pollTimeOutMs; + private KafkaConsumer consumer; + private KafkaProducer producer; // ----- Constructors ---------------------------------------------------- @@ -109,144 +80,159 @@ public class KafkaNotification extends AbstractNotification implements Service { @Inject public KafkaNotification(Configuration applicationProperties) throws AtlasException { super(applicationProperties); - Configuration subsetConfiguration = - ApplicationProperties.getSubsetConfiguration(applicationProperties, PROPERTY_PREFIX); - properties = ConfigurationConverter.getProperties(subsetConfiguration); - //override to store offset in kafka - //todo do we need ability to replay? + + LOG.info("==> KafkaNotification()"); + + Configuration kafkaConf = ApplicationProperties.getSubsetConfiguration(applicationProperties, PROPERTY_PREFIX); + + properties = ConfigurationConverter.getProperties(kafkaConf); + pollTimeOutMs = kafkaConf.getLong("poll.timeout.ms", 1000); //Override default configs - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.StringSerializer"); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.StringSerializer"); - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.StringDeserializer"); - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - pollTimeOutMs = subsetConfiguration.getLong("poll.timeout.ms", 1000); - boolean oldApiCommitEnbleFlag = subsetConfiguration.getBoolean("auto.commit.enable",false); + boolean oldApiCommitEnableFlag = kafkaConf.getBoolean("auto.commit.enable", false); + //set old autocommit value if new autoCommit property is not set. - properties.put("enable.auto.commit", subsetConfiguration.getBoolean("enable.auto.commit", oldApiCommitEnbleFlag)); - properties.put("session.timeout.ms", subsetConfiguration.getString("session.timeout.ms", "30000")); + properties.put("enable.auto.commit", kafkaConf.getBoolean("enable.auto.commit", oldApiCommitEnableFlag)); + properties.put("session.timeout.ms", kafkaConf.getString("session.timeout.ms", "30000")); + LOG.info("<== KafkaNotification()"); } @VisibleForTesting protected KafkaNotification(Properties properties) { - this.properties = properties; + super(); + + LOG.info("==> KafkaNotification()"); + + this.properties = properties; + this.pollTimeOutMs = 1000L; + + LOG.info("<== KafkaNotification()"); + } + + @VisibleForTesting + String getTopicName(NotificationType notificationType) { + return TOPIC_MAP.get(notificationType); } // ----- Service --------------------------------------------------------- @Override public void start() throws AtlasException { - if (isHAEnabled()) { - LOG.info("Not starting embedded instances when HA is enabled."); - return; - } - if (isEmbedded()) { - try { - startZk(); - startKafka(); - } catch (Exception e) { - throw new AtlasException("Failed to start embedded kafka", e); - } - } + LOG.info("==> KafkaNotification.start()"); + + LOG.info("<== KafkaNotification.start()"); } @Override public void stop() { - if (kafkaServer != null) { - kafkaServer.shutdown(); - } + LOG.info("==> KafkaNotification.stop()"); - if (factory != null) { - factory.shutdown(); - } + LOG.info("<== KafkaNotification.stop()"); } // ----- NotificationInterface ------------------------------------------- - @Override - public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, - int numConsumers) { - return createConsumers(notificationType, numConsumers, - Boolean.valueOf(properties.getProperty("enable.auto.commit", properties.getProperty("auto.commit.enable","false")))); + public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) { + return createConsumers(notificationType, numConsumers, Boolean.valueOf(properties.getProperty("enable.auto.commit", properties.getProperty("auto.commit.enable","false")))); } @VisibleForTesting - public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, - int numConsumers, boolean autoCommitEnabled) { + public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers, boolean autoCommitEnabled) { + LOG.info("==> KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled); - Properties consumerProperties = getConsumerProperties(notificationType); + Properties consumerProperties = getConsumerProperties(notificationType); + AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType, getKafkaConsumer(consumerProperties, notificationType, autoCommitEnabled), autoCommitEnabled, pollTimeOutMs); - List<NotificationConsumer<T>> consumers = new ArrayList<>(); - AtlasKafkaConsumer kafkaConsumer =new AtlasKafkaConsumer(notificationType, getKafkaConsumer(consumerProperties, notificationType, autoCommitEnabled), autoCommitEnabled, pollTimeOutMs); + List<NotificationConsumer<T>> consumers = Collections.singletonList(kafkaConsumer); + + LOG.info("<== KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled); - consumers.add(kafkaConsumer); return consumers; } @Override public void close() { + LOG.info("==> KafkaNotification.close()"); + if (producer != null) { producer.close(); + producer = null; } + + LOG.info("<== KafkaNotification.close()"); } // ----- AbstractNotification -------------------------------------------- - @Override public void sendInternal(NotificationType type, List<String> messages) throws NotificationException { if (producer == null) { createProducer(); } + sendInternalToProducer(producer, type, messages); } @VisibleForTesting void sendInternalToProducer(Producer p, NotificationType type, List<String> messages) throws NotificationException { - String topic = TOPIC_MAP.get(type); + String topic = TOPIC_MAP.get(type); List<MessageContext> messageContexts = new ArrayList<>(); + for (String message : messages) { ProducerRecord record = new ProducerRecord(topic, message); - LOG.debug("Sending message for topic {}: {}", topic, message); + + if (LOG.isDebugEnabled()) { + LOG.debug("Sending message for topic {}: {}", topic, message); + } + Future future = p.send(record); + messageContexts.add(new MessageContext(future, message)); } - List<String> failedMessages = new ArrayList<>(); - Exception lastFailureException = null; + List<String> failedMessages = new ArrayList<>(); + Exception lastFailureException = null; + for (MessageContext context : messageContexts) { try { RecordMetadata response = context.getFuture().get(); - LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(), - response.partition(), response.offset()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(), response.partition(), response.offset()); + } } catch (Exception e) { lastFailureException = e; + failedMessages.add(context.getMessage()); } } + if (lastFailureException != null) { throw new NotificationException(lastFailureException, failedMessages); } } - public KafkaConsumer getKafkaConsumer(Properties consumerProperties, NotificationType type, boolean autoCommitEnabled) { + public KafkaConsumer getKafkaConsumer(Properties consumerProperties, NotificationType type, boolean autoCommitEnabled) { if(this.consumer == null) { try { String topic = TOPIC_MAP.get(type); + consumerProperties.put("enable.auto.commit", autoCommitEnabled); + this.consumer = new KafkaConsumer(consumerProperties); + this.consumer.subscribe(Arrays.asList(topic)); - }catch (Exception ee) { + } catch (Exception ee) { LOG.error("Exception in getKafkaConsumer ", ee); } } @@ -255,110 +241,39 @@ public class KafkaNotification extends AbstractNotification implements Service { } - - // Get properties for consumer request private Properties getConsumerProperties(NotificationType type) { // find the configured group id for the given notification type - String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY); + if (StringUtils.isEmpty(groupId)) { throw new IllegalStateException("No configuration group id set for the notification type " + type); } Properties consumerProperties = new Properties(); + consumerProperties.putAll(properties); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - LOG.info("Consumer property: atlas.kafka.enable.auto.commit: {}", consumerProperties.getProperty("enable.auto.commit")); return consumerProperties; } - private File constructDir(String dirPrefix) { - File file = new File(properties.getProperty(ATLAS_KAFKA_DATA), dirPrefix); - if (!file.exists() && !file.mkdirs()) { - throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath()); - } - return file; - } - private synchronized void createProducer() { + LOG.info("==> KafkaNotification.createProducer()"); + if (producer == null) { producer = new KafkaProducer(properties); } - } - - private URL getURL(String url) throws MalformedURLException { - try { - return new URL(url); - } catch (MalformedURLException e) { - return new URL("http://" + url); - } - } - - private String startZk() throws IOException, InterruptedException, URISyntaxException { - String zkValue = properties.getProperty("zookeeper.connect"); - LOG.debug("Starting zookeeper at {}", zkValue); - - URL zkAddress = getURL(zkValue); - this.factory = NIOServerCnxnFactory.createFactory( - new InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024); - File snapshotDir = constructDir("zk/txn"); - File logDir = constructDir("zk/snap"); - factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500)); - return factory.getLocalAddress().getAddress().toString(); - } - - private void startKafka() throws IOException, URISyntaxException { - String kafkaValue = properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); - LOG.debug("Starting kafka at {}", kafkaValue); - URL kafkaAddress = getURL(kafkaValue); - - Properties brokerConfig = properties; - brokerConfig.setProperty("broker.id", "1"); - brokerConfig.setProperty("host.name", kafkaAddress.getHost()); - brokerConfig.setProperty("port", String.valueOf(kafkaAddress.getPort())); - brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath()); - brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1)); - - kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), new SystemTime(), - Option.apply(this.getClass().getName())); - kafkaServer.startup(); - LOG.debug("Embedded kafka server started with broker config {}", brokerConfig); - } - - - // ----- inner class : SystemTime ---------------------------------------- - - private static class SystemTime implements Time { - @Override - public long milliseconds() { - return System.currentTimeMillis(); - } - - @Override - public long nanoseconds() { - return System.nanoTime(); - } - - @Override - public void sleep(long arg0) { - try { - Thread.sleep(arg0); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } + LOG.info("<== KafkaNotification.createProducer()"); } private class MessageContext { - private final Future<RecordMetadata> future; - private final String message; + private final String message; public MessageContext(Future<RecordMetadata> future, String message) { - this.future = future; + this.future = future; this.message = message; } http://git-wip-us.apache.org/repos/asf/atlas/blob/3ee4f253/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java index 8bc7cb4..45a66bf 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java @@ -18,26 +18,18 @@ package org.apache.atlas.notification; import com.google.common.annotations.VisibleForTesting; -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; -import com.google.gson.JsonSerializationContext; -import com.google.gson.JsonSerializer; import org.apache.atlas.AtlasException; -import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.model.notification.AtlasNotificationBaseMessage; import org.apache.atlas.model.notification.AtlasNotificationMessage; import org.apache.atlas.model.notification.AtlasNotificationStringMessage; -import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind; import org.apache.atlas.type.AtlasType; import org.apache.atlas.model.notification.MessageVersion; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; -import org.codehaus.jettison.json.JSONArray; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Type; import java.net.Inet4Address; import java.net.UnknownHostException; import java.util.ArrayList; @@ -63,8 +55,6 @@ public abstract class AbstractNotification implements NotificationInterface { */ public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0"); - public static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded"; - public static final int MAX_BYTES_PER_CHAR = 4; // each char can encode upto 4 bytes in UTF-8 /** @@ -77,20 +67,13 @@ public abstract class AbstractNotification implements NotificationInterface { */ private static String currentUser = ""; - private final boolean embedded; - private final boolean isHAEnabled; - // ----- Constructors ---------------------------------------------------- public AbstractNotification(Configuration applicationProperties) throws AtlasException { - this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false); - this.isHAEnabled = HAConfiguration.isHAEnabled(applicationProperties); } @VisibleForTesting protected AbstractNotification() { - embedded = false; - isHAEnabled = false; } // ----- NotificationInterface ------------------------------------------- @@ -117,25 +100,6 @@ public abstract class AbstractNotification implements NotificationInterface { } // ----- AbstractNotification -------------------------------------------- - - /** - * Determine whether or not the notification service embedded in Atlas server. - * - * @return true if the the notification service embedded in Atlas server. - */ - protected final boolean isEmbedded() { - return embedded; - } - - /** - * Determine whether or not the high availability feature is enabled. - * - * @return true if the high availability feature is enabled. - */ - protected final boolean isHAEnabled() { - return isHAEnabled; - } - /** * Send the given messages. * @@ -250,30 +214,6 @@ public abstract class AbstractNotification implements NotificationInterface { } } - - // ----- serializers ----------------------------------------------------- - - /** - * Serializer for Referenceable. - */ - public static final class ReferenceableSerializer implements JsonSerializer<Referenceable> { - @Override - public JsonElement serialize(Referenceable src, Type typeOfSrc, JsonSerializationContext context) { - String instanceJson = AtlasType.toV1Json(src); - return new JsonParser().parse(instanceJson).getAsJsonObject(); - } - } - - /** - * Serializer for JSONArray. - */ - public static final class JSONArraySerializer implements JsonSerializer<JSONArray> { - @Override - public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) { - return new JsonParser().parse(src.toString()).getAsJsonArray(); - } - } - private static String getNextMessageId() { String nextMsgIdPrefix = msgIdPrefix; int nextMsgIdSuffix = msgIdSuffix.getAndIncrement(); http://git-wip-us.apache.org/repos/asf/atlas/blob/3ee4f253/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java index e0655f3..63114bf 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java @@ -35,22 +35,17 @@ import java.util.List; import static org.testng.Assert.assertEquals; public class KafkaNotificationTest { + private EmbeddedKafkaServer kafkaServer; private KafkaNotification kafkaNotification; @BeforeClass public void setup() throws Exception { - Configuration properties = ApplicationProperties.get(); - - properties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5)); - - kafkaNotification = new KafkaNotification(properties); - kafkaNotification.start(); + initNotificationService(); } @AfterClass public void shutdown() throws Exception { - kafkaNotification.close(); - kafkaNotification.stop(); + cleanUpNotificationService(); } @Test @@ -81,4 +76,29 @@ public class KafkaNotificationTest { consumer.close(); } + + void initNotificationService() throws Exception { + Configuration applicationProperties = ApplicationProperties.get(); + + applicationProperties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5)); + + kafkaServer = new EmbeddedKafkaServer(applicationProperties); + kafkaNotification = new KafkaNotification(applicationProperties); + + kafkaServer.start(); + kafkaNotification.start(); + + Thread.sleep(2000); + } + + void cleanUpNotificationService() throws Exception { + if (kafkaNotification != null) { + kafkaNotification.close(); + kafkaNotification.stop(); + } + + if (kafkaServer != null) { + kafkaServer.stop(); + } + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/3ee4f253/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java index 486b30b..aaa51bd 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java +++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java @@ -30,6 +30,7 @@ import org.apache.atlas.v1.model.typedef.*; import org.apache.atlas.type.AtlasType; import org.apache.atlas.v1.typesystem.types.utils.TypesUtil; import org.apache.atlas.web.integration.BaseResourceIT; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import java.util.*; @@ -44,7 +45,6 @@ import static org.testng.Assert.assertTrue; public class EntityNotificationIT extends BaseResourceIT { private final String DATABASE_NAME = "db" + randomString(); private final String TABLE_NAME = "table" + randomString(); - private final NotificationInterface notificationInterface = NotificationProvider.get(); private Id tableId; private Id dbId; private String traitName; @@ -54,6 +54,8 @@ public class EntityNotificationIT extends BaseResourceIT { public void setUp() throws Exception { super.setUp(); + initNotificationService(); + createTypeDefinitionsV1(); Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(DATABASE_NAME); @@ -63,6 +65,11 @@ public class EntityNotificationIT extends BaseResourceIT { notificationConsumer = notificationInterface.createConsumers(NotificationType.ENTITIES, 1).get(0); } + @AfterClass + public void teardown() throws Exception { + cleanUpNotificationService(); + } + public void testCreateEntity() throws Exception { Referenceable tableInstance = createHiveTableInstanceBuiltIn(DATABASE_NAME, TABLE_NAME, dbId); http://git-wip-us.apache.org/repos/asf/atlas/blob/3ee4f253/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java index f248593..3dd3e1d 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java @@ -19,7 +19,6 @@ package org.apache.atlas.notification; import org.apache.atlas.EntityAuditEvent; -import org.apache.atlas.kafka.NotificationProvider; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.v1.model.instance.Id; import org.apache.atlas.v1.model.instance.Referenceable; @@ -46,18 +45,18 @@ public class NotificationHookConsumerIT extends BaseResourceIT { public static final String QUALIFIED_NAME = "qualifiedName"; public static final String CLUSTER_NAME = "clusterName"; - private final NotificationInterface notificationInterface = NotificationProvider.get(); - @BeforeClass public void setUp() throws Exception { super.setUp(); + initNotificationService(); + createTypeDefinitionsV1(); } @AfterClass public void teardown() throws Exception { - notificationInterface.close(); + cleanUpNotificationService(); } private void sendHookMessage(HookNotification message) throws NotificationException, InterruptedException { http://git-wip-us.apache.org/repos/asf/atlas/blob/3ee4f253/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index 4ea13c7..d2b3dfd 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -22,9 +22,7 @@ import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.kafka.AtlasKafkaMessage; -import org.apache.atlas.kafka.KafkaNotification; -import org.apache.atlas.kafka.NotificationProvider; +import org.apache.atlas.kafka.*; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.v1.model.instance.Referenceable; @@ -45,7 +43,6 @@ import org.testng.annotations.Test; import java.util.List; -import org.apache.atlas.kafka.AtlasKafkaConsumer; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; @@ -62,7 +59,9 @@ public class NotificationHookConsumerKafkaTest { public static final String DESCRIPTION = "description"; public static final String QUALIFIED_NAME = "qualifiedName"; - private final NotificationInterface notificationInterface = NotificationProvider.get(); + private NotificationInterface notificationInterface = null; + private EmbeddedKafkaServer kafkaServer = null; + private KafkaNotification kafkaNotification = null; @Mock @@ -77,8 +76,6 @@ public class NotificationHookConsumerKafkaTest { @Mock private AtlasTypeRegistry typeRegistry; - private KafkaNotification kafkaNotification; - @BeforeTest public void setup() throws AtlasException, InterruptedException, AtlasBaseException { MockitoAnnotations.initMocks(this); @@ -90,64 +87,53 @@ public class NotificationHookConsumerKafkaTest { when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity); - kafkaNotification = startKafkaServer(); + initNotificationService(); } @AfterTest public void shutdown() { - kafkaNotification.close(); - kafkaNotification.stop(); + cleanUpNotificationService(); } @Test public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException, AtlasBaseException { - try { - produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity())); + produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity())); - NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); + NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); - consumeOneMessage(consumer, hookConsumer); + consumeOneMessage(consumer, hookConsumer); - verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean()); + verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean()); - // produce another message, and make sure it moves ahead. If commit succeeded, this would work. - produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity())); - consumeOneMessage(consumer, hookConsumer); + // produce another message, and make sure it moves ahead. If commit succeeded, this would work. + produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity())); + consumeOneMessage(consumer, hookConsumer); - verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); - reset(atlasEntityStore); - } - finally { - kafkaNotification.close(); - } + verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); + reset(atlasEntityStore); } @Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled") public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception { - try { - produceMessage(new HookNotificationV1.EntityCreateRequest("test_user3", createEntity())); + produceMessage(new HookNotificationV1.EntityCreateRequest("test_user3", createEntity())); - NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, true); + NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, true); - assertNotNull (consumer); + assertNotNull (consumer); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); - consumeOneMessage(consumer, hookConsumer); - verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean()); + consumeOneMessage(consumer, hookConsumer); + verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean()); - // produce another message, but this will not be consumed, as commit code is not executed in hook consumer. - produceMessage(new HookNotificationV1.EntityCreateRequest("test_user4", createEntity())); + // produce another message, but this will not be consumed, as commit code is not executed in hook consumer. + produceMessage(new HookNotificationV1.EntityCreateRequest("test_user4", createEntity())); - consumeOneMessage(consumer, hookConsumer); - verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); - } - finally { - kafkaNotification.close(); - } + consumeOneMessage(consumer, hookConsumer); + verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); } AtlasKafkaConsumer<HookNotification> createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) { @@ -185,25 +171,38 @@ public class NotificationHookConsumerKafkaTest { return entity; } - KafkaNotification startKafkaServer() throws AtlasException, InterruptedException { + protected String randomString() { + return RandomStringUtils.randomAlphanumeric(10); + } + + private void produceMessage(HookNotification message) throws NotificationException { + kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message); + } + + void initNotificationService() throws AtlasException, InterruptedException { Configuration applicationProperties = ApplicationProperties.get(); applicationProperties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5)); - kafkaNotification = new KafkaNotification(applicationProperties); + kafkaServer = new EmbeddedKafkaServer(applicationProperties); + kafkaNotification = new KafkaNotification(applicationProperties); + notificationInterface = kafkaNotification; + kafkaServer.start(); kafkaNotification.start(); Thread.sleep(2000); - - return kafkaNotification; } - protected String randomString() { - return RandomStringUtils.randomAlphanumeric(10); - } + void cleanUpNotificationService() { + if (kafkaNotification != null) { + kafkaNotification.close(); + kafkaNotification.stop(); + } - private void produceMessage(HookNotification message) throws NotificationException { - kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message); + if (kafkaServer != null) { + kafkaServer.stop(); + } } + } http://git-wip-us.apache.org/repos/asf/atlas/blob/3ee4f253/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java index ab27612..fe4d688 100755 --- a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java @@ -34,6 +34,7 @@ import org.apache.atlas.model.typedef.*; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef; +import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.v1.model.instance.Id; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Struct; @@ -95,6 +96,11 @@ public abstract class BaseResourceIT { protected AtlasClientV2 atlasClientV2; protected String[] atlasUrls; + + protected NotificationInterface notificationInterface = null; + protected EmbeddedKafkaServer kafkaServer = null; + protected KafkaNotification kafkaNotification = null; + @BeforeClass public void setUp() throws Exception { //set high timeouts so that tests do not fail due to read timeouts while you @@ -675,4 +681,30 @@ public abstract class BaseResourceIT { protected JSONArray searchByDSL(String dslQuery) throws AtlasServiceException { return atlasClientV1.searchByDSL(dslQuery, 10, 0); } + + protected void initNotificationService() throws Exception { + Configuration applicationProperties = ApplicationProperties.get(); + + applicationProperties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5)); + + kafkaServer = new EmbeddedKafkaServer(applicationProperties); + kafkaNotification = new KafkaNotification(applicationProperties); + notificationInterface = kafkaNotification; + + kafkaServer.start(); + kafkaNotification.start(); + + Thread.sleep(2000); + } + + protected void cleanUpNotificationService() { + if (kafkaNotification != null) { + kafkaNotification.close(); + kafkaNotification.stop(); + } + + if (kafkaServer != null) { + kafkaServer.stop(); + } + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/3ee4f253/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java index bae0691..9374719 100755 --- a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java @@ -63,8 +63,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT { private static final String TRAITS = "traits"; - private NotificationInterface notificationInterface = NotificationProvider.get(); - @BeforeClass public void setUp() throws Exception { super.setUp();
