This is an automated email from the ASF dual-hosted git repository.
radhikakundam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 8d8169e83 ATLAS-4335: Hook Notifications through Rest Interface
8d8169e83 is described below
commit 8d8169e83ac4acfb7b152594eac96ddbc9bb8437
Author: radhikakundam <[email protected]>
AuthorDate: Tue Dec 6 11:23:22 2022 -0800
ATLAS-4335: Hook Notifications through Rest Interface
Signed-off-by: radhikakundam <[email protected]>
---
addons/falcon-bridge/pom.xml | 5 -
addons/hbase-bridge/pom.xml | 8 +-
addons/hive-bridge/pom.xml | 5 +
addons/impala-bridge/pom.xml | 5 -
addons/sqoop-bridge/pom.xml | 5 -
.../org/apache/atlas/authorize/AtlasPrivilege.java | 4 +-
.../main/java/org/apache/atlas/AtlasClientV2.java | 15 +-
.../java/org/apache/atlas/AtlasConfiguration.java | 5 +
.../main/java/org/apache/atlas/AtlasErrorCode.java | 4 +-
notification/pom.xml | 4 +-
.../main/java/org/apache/atlas/hook/AtlasHook.java | 6 +
.../org/apache/atlas/kafka/AtlasKafkaConsumer.java | 26 +++-
.../org/apache/atlas/kafka/AtlasKafkaMessage.java | 18 ++-
.../org/apache/atlas/kafka/KafkaNotification.java | 77 ++++++++++-
.../apache/atlas/kafka/NotificationProvider.java | 31 +++--
.../AtlasNotificationMessageDeserializer.java | 7 +
.../atlas/notification/NotificationConsumer.java | 7 +
.../atlas/notification/NotificationInterface.java | 3 +
.../atlas/notification/rest/RestNotification.java | 153 +++++++++++++++++++++
.../AbstractNotificationConsumerTest.java | 5 +
.../atlas/notification/RestNotificationTest.java | 136 ++++++++++++++++++
.../notification/NotificationHookConsumer.java | 117 ++++++++++++++--
.../apache/atlas/web/rest/NotificationREST.java | 121 ++++++++++++++++
.../atlas/web/integration/NotificationRestIT.java | 73 ++++++++++
.../json/notifications/create-db-ddl.json | 1 +
.../resources/json/notifications/create-db.json | 1 +
.../resources/json/notifications/delete-db.json | 1 +
27 files changed, 789 insertions(+), 54 deletions(-)
diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml
index 1e2ce7c81..3576173b9 100644
--- a/addons/falcon-bridge/pom.xml
+++ b/addons/falcon-bridge/pom.xml
@@ -185,11 +185,6 @@
<artifactId>jersey-json</artifactId>
<version>${jersey.version}</version>
</artifactItem>
- <artifactItem>
- <groupId>javax.ws.rs</groupId>
- <artifactId>jsr311-api</artifactId>
- <version>${jsr.version}</version>
- </artifactItem>
</artifactItems>
</configuration>
</execution>
diff --git a/addons/hbase-bridge/pom.xml b/addons/hbase-bridge/pom.xml
index ca598ab39..d78abbb37 100644
--- a/addons/hbase-bridge/pom.xml
+++ b/addons/hbase-bridge/pom.xml
@@ -63,7 +63,7 @@
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-bundle</artifactId>
- <version>1.19</version>
+ <version>${jersey.version}</version>
<scope>test</scope>
</dependency>
@@ -399,11 +399,6 @@
<artifactId>jersey-bundle</artifactId>
<version>${jersey.version}</version>
</artifactItem>
- <artifactItem>
- <groupId>javax.ws.rs</groupId>
- <artifactId>jsr311-api</artifactId>
- <version>${jsr.version}</version>
- </artifactItem>
</artifactItems>
</configuration>
</execution>
@@ -621,7 +616,6 @@
</execution>
</executions>
</plugin>
-
</plugins>
</build>
</project>
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index 35bd4e553..3464b65f0 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -311,6 +311,11 @@
<artifactId>jersey-json</artifactId>
<version>${jersey.version}</version>
</artifactItem>
+ <artifactItem>
+ <groupId>com.sun.jersey</groupId>
+
<artifactId>jersey-client</artifactId>
+
<version>${jersey.version}</version>
+ </artifactItem>
</artifactItems>
</configuration>
</execution>
diff --git a/addons/impala-bridge/pom.xml b/addons/impala-bridge/pom.xml
index a9759b36d..915dcbb4b 100644
--- a/addons/impala-bridge/pom.xml
+++ b/addons/impala-bridge/pom.xml
@@ -325,11 +325,6 @@
<artifactId>jersey-json</artifactId>
<version>${jersey.version}</version>
</artifactItem>
- <artifactItem>
- <groupId>javax.ws.rs</groupId>
- <artifactId>jsr311-api</artifactId>
- <version>${jsr.version}</version>
- </artifactItem>
</artifactItems>
</configuration>
</execution>
diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml
index e970c8011..4b6eac98f 100644
--- a/addons/sqoop-bridge/pom.xml
+++ b/addons/sqoop-bridge/pom.xml
@@ -249,11 +249,6 @@
<artifactId>jersey-json</artifactId>
<version>${jersey.version}</version>
</artifactItem>
- <artifactItem>
- <groupId>javax.ws.rs</groupId>
- <artifactId>jsr311-api</artifactId>
- <version>${jsr.version}</version>
- </artifactItem>
</artifactItems>
</configuration>
</execution>
diff --git
a/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java
b/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java
index 5d06e1b29..f270844c5 100644
--- a/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java
+++ b/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java
@@ -46,7 +46,9 @@ public enum AtlasPrivilege {
TYPE_READ("type-read"),
- ADMIN_AUDITS("admin-audits");
+ ADMIN_AUDITS("admin-audits"),
+
+ SERVICE_NOTIFICATION_POST("service-notification-post");
private final String type;
diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 6910b0e42..4970baaa9 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -134,6 +134,9 @@ public class AtlasClientV2 extends AtlasBaseClient {
private static final String GLOSSARY_CATEGORY = GLOSSARY_URI +
"/category";
private static final String GLOSSARY_CATEGORIES = GLOSSARY_URI +
"/categories";
+ //Notification APIs
+ private static final String NOTIFICATION_URI = BASE_URI +
"v2/notification";
+
public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword)
{
super(baseUrl, basicAuthUserNamePassword);
@@ -173,7 +176,7 @@ public class AtlasClientV2 extends AtlasBaseClient {
}
@VisibleForTesting
- AtlasClientV2(WebResource service, Configuration configuration) {
+ public AtlasClientV2(WebResource service, Configuration configuration) {
super(service, configuration);
}
@@ -1024,6 +1027,14 @@ public class AtlasClientV2 extends AtlasBaseClient {
return callAPI(API_V2.IMPORT_GLOSSARY, BulkImportResponse.class,
multipartEntity);
}
+ public void postNotificationToTopic(String topic, List<String> messages)
throws AtlasServiceException {
+ callAPI(formatPathParameters(API_V2.POST_NOTIFICATIONS_TO_TOPIC,
topic), (Class<?>) null, messages);
+ }
+
+ @VisibleForTesting
+ public API formatPathWithParameter(API api, String... params) {
+ return formatPathParameters(api, params);
+ }
@Override
protected API formatPathParameters(API api, String... params) {
@@ -1199,6 +1210,8 @@ public class AtlasClientV2 extends AtlasBaseClient {
public static final API_V2 GET_BUSINESS_METADATA_TEMPLATE = new
API_V2(ENTITY_API + "businessmetadata/import/template", HttpMethod.GET,
Response.Status.OK, MediaType.APPLICATION_JSON,
MediaType.APPLICATION_OCTET_STREAM);
public static final API_V2 IMPORT_BUSINESS_METADATA = new
API_V2(ENTITY_API + "businessmetadata/import", HttpMethod.POST,
Response.Status.OK, MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON);
+ public static final API_V2 POST_NOTIFICATIONS_TO_TOPIC = new
API_V2(NOTIFICATION_URI + "/topic/%s", HttpMethod.POST,
Response.Status.NO_CONTENT);
+
// labels APIs
public static final API_V2 ADD_LABELS = new
API_V2(ENTITY_API + "guid/%s/labels", HttpMethod.PUT,
Response.Status.NO_CONTENT);
public static final API_V2 ADD_LABELS_BY_UNIQUE_ATTRIBUTE = new
API_V2(ENTITY_API + "uniqueAttribute/type/%s/labels", HttpMethod.PUT,
Response.Status.NO_CONTENT);
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index e8c7a15ea..df886753f 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -40,9 +40,14 @@ public enum AtlasConfiguration {
NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name",
"ATLAS_HOOK"),
NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name",
"ATLAS_ENTITIES"),
+
NOTIFICATION_HOOK_CONSUMER_BUFFERING_INTERVAL("atlas.notification.consumer.message.buffering.interval.seconds",
15),
+
NOTIFICATION_HOOK_CONSUMER_BUFFERING_BATCH_SIZE("atlas.notification.consumer.message.buffering.batch.size",
100),
+
+ NOTIFICATION_HOOK_REST_ENABLED("atlas.hook.rest.notification.enabled",
false),
NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES("atlas.notification.hook.consumer.topic.names",
"ATLAS_HOOK"), // a comma separated list of topic names
NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES("atlas.notification.entities.consumer.topic.names",
"ATLAS_ENTITIES"), // a comma separated list of topic names
+
NOTIFICATION_REST_BODY_MAX_LENGTH_BYTES("atlas.notification.rest.body.max.length.bytes",
(1 * 1024 * 1024)),
NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes",
(1000 * 1000)),
NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled",
true),
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds",
15 * 60),
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index 21ac7f78e..77a6fd8c3 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -176,6 +176,7 @@ public enum AtlasErrorCode {
ATTRIBUTE_NAME_ALREADY_EXISTS_IN_ANOTHER_PARENT_TYPE(400,
"ATLAS-400-00-09E", "Invalid attribute name: {0}.{1}. Attribute already exists
in another parent type: {2}"),
IMPORT_INVALID_ZIP_ENTRY(400, "ATLAS-400-00-09F", "{0}: invalid zip entry.
Reason: {1}"),
LINEAGE_ON_DEMAND_NOT_ENABLED(400, "ATLAS-400-00-100", "Lineage on demand
config: {0} is not enabled"),
+ INVALID_TOPIC_NAME(400, "ATLAS-400-00-101", "Unsupported topic name :
{0}"),
UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to
perform {1}"),
@@ -243,7 +244,8 @@ public enum AtlasErrorCode {
ENTITY_NOTIFICATION_FAILED(500, "ATLAS-500-00-014", "Notification failed
for operation: {0} : {1}"),
FAILED_TO_UPLOAD(500, "ATLAS-500-00-015", "Error occurred while uploading
the file: {0}"),
FAILED_TO_CREATE_GLOSSARY_TERM(500, "ATLAS-500-00-016", "Error occurred
while creating glossary term: {0}"),
- FAILED_TO_UPDATE_GLOSSARY_TERM(500, "ATLAS-500-00-017", "Error occurred
while updating glossary term: {0}");
+ FAILED_TO_UPDATE_GLOSSARY_TERM(500, "ATLAS-500-00-017", "Error occurred
while updating glossary term: {0}"),
+ NOTIFICATION_EXCEPTION(500, "ATLAS-500-00-018", "{0}");
private String errorCode;
private String errorMessage;
diff --git a/notification/pom.xml b/notification/pom.xml
index aaf11c7a4..4e5d9a835 100644
--- a/notification/pom.xml
+++ b/notification/pom.xml
@@ -42,7 +42,9 @@
<dependency>
<groupId>org.apache.atlas</groupId>
- <artifactId>atlas-server-api</artifactId>
+ <artifactId>atlas-client-v2</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 24ea6ea83..4c70aedb9 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -21,6 +21,7 @@ package org.apache.atlas.hook;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.model.notification.HookNotification;
@@ -63,6 +64,7 @@ public abstract class AtlasHook {
public static final String CONF_METADATA_NAMESPACE
= "atlas.metadata.namespace";
public static final String CLUSTER_NAME_KEY
= "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME
= "primary";
+ public static final String CONF_ATLAS_HOOK_MESSAGES_SORT_ENABLED
= "atlas.hook.messages.sort.enabled";
protected static Configuration atlasProperties;
protected static NotificationInterface notificationInterface;
@@ -75,6 +77,8 @@ public abstract class AtlasHook {
private static final int notificationMaxRetries;
private static final int notificationRetryInterval;
private static ExecutorService executor = null;
+ public static final boolean isRESTNotificationEnabled;
+ public static final boolean isHookMsgsSortEnabled;
static {
@@ -95,6 +99,8 @@ public abstract class AtlasHook {
failedMessagesLogger = null;
}
+ isRESTNotificationEnabled =
AtlasConfiguration.NOTIFICATION_HOOK_REST_ENABLED.getBoolean();
+ isHookMsgsSortEnabled =
atlasProperties.getBoolean(CONF_ATLAS_HOOK_MESSAGES_SORT_ENABLED,
isRESTNotificationEnabled);
metadataNamespace = getMetadataNamespace(atlasProperties);
notificationMaxRetries =
atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);
notificationRetryInterval =
atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
diff --git
a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index 96dc5856a..89ec59caa 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -73,6 +73,11 @@ public class AtlasKafkaConsumer<T> extends
AbstractNotificationConsumer<T> {
return receive(this.pollTimeoutMilliSeconds,
lastCommittedPartitionOffset);
}
+ @Override
+ public List<AtlasKafkaMessage<T>>
receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long>
lastCommittedPartitionOffset) {
+ return receiveRawRecords(this.pollTimeoutMilliSeconds,
lastCommittedPartitionOffset);
+ }
+
@Override
public void commit(TopicPartition partition, long offset) {
@@ -98,7 +103,15 @@ public class AtlasKafkaConsumer<T> extends
AbstractNotificationConsumer<T> {
}
}
+ private List<AtlasKafkaMessage<T>> receiveRawRecords(long
timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset) {
+ return receive(timeoutMilliSeconds, lastCommittedPartitionOffset,
true);
+ }
+
private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds,
Map<TopicPartition, Long> lastCommittedPartitionOffset) {
+ return receive(timeoutMilliSeconds, lastCommittedPartitionOffset,
false);
+ }
+
+ private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds,
Map<TopicPartition, Long> lastCommittedPartitionOffset, boolean
isRawDataRequired) {
List<AtlasKafkaMessage<T>> messages = new ArrayList();
ConsumerRecords<?, ?> records = kafkaConsumer != null ?
kafkaConsumer.poll(timeoutMilliSeconds) : null;
@@ -134,8 +147,17 @@ public class AtlasKafkaConsumer<T> extends
AbstractNotificationConsumer<T> {
continue;
}
- messages.add(new AtlasKafkaMessage(message, record.offset(),
record.topic(), record.partition(),
-
deserializer.getMsgCreated(), deserializer.getSpooled()));
+ AtlasKafkaMessage kafkaMessage = null;
+
+ if (isRawDataRequired) {
+ kafkaMessage = new AtlasKafkaMessage(message,
record.offset(), record.topic(), record.partition(),
+ deserializer.getMsgCreated(),
deserializer.getSpooled(), deserializer.getSource(), record.value().toString());
+ } else {
+ kafkaMessage = new AtlasKafkaMessage(message,
record.offset(), record.topic(), record.partition(),
+ deserializer.getMsgCreated(),
deserializer.getSpooled(), deserializer.getSource());
+ }
+
+ messages.add(kafkaMessage);
}
}
diff --git
a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
index af3727df4..390eca7ba 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java
@@ -26,13 +26,25 @@ public class AtlasKafkaMessage<T> {
private final TopicPartition topicPartition;
private final boolean spooled;
private final long msgCreated;
+ private final String source;
+ private final String rawRecordData;
- public AtlasKafkaMessage(T message, long offset, String topic, int
partition, long msgCreated, boolean spooled) {
+ public AtlasKafkaMessage(T message, long offset, String topic, int
partition, long msgCreated, boolean spooled, String source, String
rawRecordData) {
this.message = message;
this.offset = offset;
this.topicPartition = new TopicPartition(topic, partition);
this.msgCreated = msgCreated;
this.spooled = spooled;
+ this.source = source;
+ this.rawRecordData = rawRecordData;
+ }
+
+ public AtlasKafkaMessage(T message, long offset, String topic, int
partition, long msgCreated, boolean spooled, String source) {
+ this(message, offset, topic, partition, msgCreated, spooled, source,
null);
+ }
+
+ public AtlasKafkaMessage(T message, long offset, String topic, int
partition, long msgCreated, boolean spooled) {
+ this(message, offset, topic, partition, msgCreated, spooled, null);
}
public AtlasKafkaMessage(T message, long offset, String topic, int
partition) {
@@ -66,4 +78,8 @@ public class AtlasKafkaMessage<T> {
public long getMsgCreated() {
return this.msgCreated;
}
+
+ public String getSource() { return this.source; }
+
+ public String getRawRecordData() { return this.rawRecordData; }
}
diff --git
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 32f5183a0..870d50814 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -58,6 +58,7 @@ public class KafkaNotification extends AbstractNotification
implements Service {
public static final Logger LOG =
LoggerFactory.getLogger(KafkaNotification.class);
public static final String PROPERTY_PREFIX = "atlas.kafka";
+ public static final String UNSORTED_POSTFIX = "_UNSORTED";
public static final String ATLAS_HOOK_TOPIC =
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
public static final String ATLAS_ENTITIES_TOPIC =
AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
@@ -67,9 +68,27 @@ public class KafkaNotification extends AbstractNotification
implements Service {
private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This
consumer has already been closed.";
+ public static String ATLAS_HOOK_TOPIC_UNSORTED;
+ public static String[] ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS;
+
+ static {
+ try {
+ ATLAS_HOOK_TOPIC_UNSORTED = ATLAS_HOOK_TOPIC + UNSORTED_POSTFIX;
+ ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS = ATLAS_HOOK_CONSUMER_TOPICS
!= null && ATLAS_HOOK_CONSUMER_TOPICS.length > 0
+ ? new String[ATLAS_HOOK_CONSUMER_TOPICS.length] : new
String[] {ATLAS_HOOK_TOPIC_UNSORTED};
+
+ for (int i = 0; i < ATLAS_HOOK_CONSUMER_TOPICS.length; i++) {
+ ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS[i] =
ATLAS_HOOK_CONSUMER_TOPICS[i] + UNSORTED_POSTFIX;
+ }
+ } catch (Exception e) {
+ LOG.error("Error while initializing Kafka Notification", e);
+ }
+ }
+
private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP =
new HashMap<NotificationType, String>() {
{
put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
+ put(NotificationType.HOOK_UNSORTED, ATLAS_HOOK_TOPIC_UNSORTED);
put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
}
};
@@ -77,6 +96,7 @@ public class KafkaNotification extends AbstractNotification
implements Service {
private static final Map<NotificationType, String[]> CONSUMER_TOPICS_MAP =
new HashMap<NotificationType, String[]>() {
{
put(NotificationType.HOOK,
trimAndPurge(ATLAS_HOOK_CONSUMER_TOPICS));
+ put(NotificationType.HOOK_UNSORTED,
trimAndPurge(ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS));
put(NotificationType.ENTITIES,
trimAndPurge(ATLAS_ENTITIES_CONSUMER_TOPICS));
}
};
@@ -86,6 +106,7 @@ public class KafkaNotification extends AbstractNotification
implements Service {
private final Map<NotificationType, List<KafkaConsumer>> consumers = new
HashMap<>();
private final Map<NotificationType, KafkaProducer> producers = new
HashMap<>();
private String
consumerClosedErrorMsg;
+ private final Map<String, KafkaProducer> producersByTopic
= new HashMap<>();
// ----- Constructors ----------------------------------------------------
@@ -255,6 +276,21 @@ public class KafkaNotification extends
AbstractNotification implements Service {
LOG.info("<== KafkaNotification.close()");
}
+ //Sending messages received through HTTP or REST Notification Service to
Producer
+ public void sendInternal(String topic, List<String> messages, boolean
isSortNeeded) throws NotificationException {
+ KafkaProducer producer;
+ if (isSortNeeded) {
+ topic = topic + UNSORTED_POSTFIX;
+ }
+ producer = getOrCreateProducer(topic);
+ sendInternalToProducer(producer, topic, messages);
+ }
+
+ public void sendInternal(String topic, List<String> messages) throws
NotificationException {
+ KafkaProducer producer = getOrCreateProducer(topic);
+
+ sendInternalToProducer(producer, topic, messages);
+ }
// ----- AbstractNotification --------------------------------------------
@Override
@@ -266,7 +302,11 @@ public class KafkaNotification extends
AbstractNotification implements Service {
@VisibleForTesting
void sendInternalToProducer(Producer p, NotificationType notificationType,
List<String> messages) throws NotificationException {
- String topic =
PRODUCER_TOPIC_MAP.get(notificationType);
+ String topic = PRODUCER_TOPIC_MAP.get(notificationType);
+ sendInternalToProducer(p, topic, messages);
+ }
+
+ void sendInternalToProducer(Producer p, String topic , List<String>
messages) throws NotificationException {
List<MessageContext> messageContexts = new ArrayList<>();
for (String message : messages) {
@@ -308,6 +348,9 @@ public class KafkaNotification extends AbstractNotification
implements Service {
public Properties getConsumerProperties(NotificationType notificationType)
{
// find the configured group id for the given notification type
String groupId =
properties.getProperty(notificationType.toString().toLowerCase() + "." +
CONSUMER_GROUP_ID_PROPERTY);
+ if (StringUtils.isEmpty(groupId)) {
+ groupId = "atlas";
+ }
if (StringUtils.isEmpty(groupId)) {
throw new IllegalStateException("No configuration group id set for
the notification type " + notificationType);
@@ -346,21 +389,45 @@ public class KafkaNotification extends
AbstractNotification implements Service {
private KafkaProducer getOrCreateProducer(NotificationType
notificationType) {
LOG.debug("==> KafkaNotification.getOrCreateProducer()");
- KafkaProducer ret = producers.get(notificationType);
+ KafkaProducer ret = getOrCreateProducerByCriteria(notificationType,
producers, false);
+
+ LOG.debug("<== KafkaNotification.getOrCreateProducer()");
+
+ return ret;
+ }
+
+ private KafkaProducer getOrCreateProducer(String topic) {
+ LOG.debug("==> KafkaNotification.getOrCreateProducer() by Topic");
+
+ KafkaProducer ret = getOrCreateProducerByCriteria(topic,
producersByTopic, true);
+
+ LOG.debug("<== KafkaNotification.getOrCreateProducer by Topic");
+
+ return ret;
+ }
+
+ private KafkaProducer getOrCreateProducerByCriteria(Object
producerCriteria, Map producersByCriteria, boolean fetchByTopic) {
+ LOG.debug("==> KafkaNotification.getOrCreateProducerByCriteria()");
+
+ if ((fetchByTopic && !(producerCriteria instanceof String)) ||
(!fetchByTopic && !(producerCriteria instanceof NotificationType))) {
+ LOG.error("Error while retrieving Producer due to invalid
criteria");
+ }
+
+ KafkaProducer ret = (KafkaProducer)
producersByCriteria.get(producerCriteria);
if (ret == null) {
synchronized (this) {
- ret = producers.get(notificationType);
+ ret = (KafkaProducer)
producersByCriteria.get(producerCriteria);
if (ret == null) {
ret = new KafkaProducer(properties);
- producers.put(notificationType, ret);
+ producersByCriteria.put(producerCriteria, ret);
}
}
}
- LOG.debug("<== KafkaNotification.getOrCreateProducer()");
+ LOG.debug("<== KafkaNotification.getOrCreateProducerByCriteria()");
return ret;
}
diff --git
a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
b/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
index b35af97fd..9d8686257 100644
---
a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
+++
b/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
@@ -17,26 +17,28 @@
*/
package org.apache.atlas.kafka;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
-import org.apache.atlas.notification.LogConfigUtils;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.rest.RestNotification;
import org.apache.atlas.notification.spool.AtlasFileSpool;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-
/**
* Provider class for Notification interfaces
*/
public class NotificationProvider {
private static final Logger LOG =
LoggerFactory.getLogger(NotificationProvider.class);
- private static final String CONF_ATLAS_HOOK_SPOOL_ENABLED =
"atlas.hook.spool.enabled";
- private static final String CONF_ATLAS_HOOK_SPOOL_DIR =
"atlas.hook.spool.dir";
+ @VisibleForTesting
+ public static final String CONF_ATLAS_HOOK_SPOOL_ENABLED =
"atlas.hook.spool.enabled";
+ private static final String CONF_ATLAS_HOOK_SPOOL_DIR =
"atlas.hook.spool.dir";
private static final boolean CONF_ATLAS_HOOK_SPOOL_ENABLED_DEFAULT = false;
@@ -45,25 +47,32 @@ public class NotificationProvider {
public static NotificationInterface get() {
if (notificationProvider == null) {
try {
- Configuration conf = ApplicationProperties.get();
- KafkaNotification kafka = new KafkaNotification(conf);
- String spoolDir = getSpoolDir(conf);
+ Configuration conf = ApplicationProperties.get();
+ String spoolDir = getSpoolDir(conf);
+ AbstractNotification absNotifier = null;
+
+ if (AtlasHook.isRESTNotificationEnabled) {
+ absNotifier = new RestNotification(conf);
+ } else {
+ absNotifier = new KafkaNotification(conf);
+ }
if (isSpoolingEnabled(conf) &&
StringUtils.isNotEmpty(spoolDir)) {
LOG.info("Notification spooling is enabled: spool
directory={}", spoolDir);
conf.setProperty(CONF_ATLAS_HOOK_SPOOL_DIR, spoolDir);
- notificationProvider = new AtlasFileSpool(conf, kafka);
+ notificationProvider = new AtlasFileSpool(conf,
absNotifier);
} else {
LOG.info("Notification spooling is not enabled");
- notificationProvider = kafka;
+ notificationProvider = absNotifier;
}
} catch (AtlasException e) {
- throw new RuntimeException(e);
+ throw new RuntimeException("Error while initializing
Notification interface", e);
}
}
+ LOG.debug("NotificationInterface of type {} is enabled",
notificationProvider.getClass().getSimpleName());
return notificationProvider;
}
diff --git
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index 207747d7d..3048b9c95 100644
---
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++
b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -65,6 +65,7 @@ public abstract class AtlasNotificationMessageDeserializer<T>
implements Message
private final AtomicLong
messageCountSinceLastInterval = new AtomicLong(0);
private long msgCreated;
private boolean spooled;
+ private String source;
// ----- Constructors ----------------------------------------------------
/**
@@ -112,6 +113,10 @@ public abstract class
AtlasNotificationMessageDeserializer<T> implements Message
return this.spooled;
}
+ public String getSource() {
+ return this.source;
+ }
+
@Override
public T deserialize(String messageJson) {
final T ret;
@@ -120,6 +125,7 @@ public abstract class
AtlasNotificationMessageDeserializer<T> implements Message
messageCountSinceLastInterval.incrementAndGet();
this.msgCreated = 0;
this.spooled = false;
+ this.source = null;
AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson,
AtlasNotificationMessage.class);
@@ -128,6 +134,7 @@ public abstract class
AtlasNotificationMessageDeserializer<T> implements Message
} else {
this.msgCreated = ((AtlasNotificationMessage)
msg).getMsgCreationTime();
this.spooled = ((AtlasNotificationMessage) msg).getSpooled();
+ this.source = msg.getSource() != null ?
msg.getSource().getSource() : null;
String msgJson = messageJson;
diff --git
a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 1fb9f9989..83af92b64 100644
---
a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++
b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -63,4 +63,11 @@ public interface NotificationConsumer<T> {
* @return List containing kafka message and partionId and offset.
*/
List<AtlasKafkaMessage<T>> receiveWithCheckedCommit(Map<TopicPartition,
Long> lastCommittedPartitionOffset);
+
+ /**
+ * Fetch raw data for the topics from Kafka, if lastCommittedOffset same
as message
+ * received offset, it will proceed with commit.
+ * @return List containing kafka message and partitionId and offset.
+ */
+ List<AtlasKafkaMessage<T>>
receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long>
lastCommittedPartitionOffset);
}
diff --git
a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index a9cd4a6bb..3fb616edb 100644
---
a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++
b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -46,6 +46,9 @@ public interface NotificationInterface {
// Notifications from the Atlas integration hooks.
HOOK(new HookMessageDeserializer()),
+ // Notifications from the Atlas integration hooks - unsorted.
+ HOOK_UNSORTED(new HookMessageDeserializer()),
+
// Notifications to entity change consumers.
ENTITIES(new EntityMessageDeserializer());
diff --git
a/notification/src/main/java/org/apache/atlas/notification/rest/RestNotification.java
b/notification/src/main/java/org/apache/atlas/notification/rest/RestNotification.java
new file mode 100644
index 000000000..fb598f899
--- /dev/null
+++
b/notification/src/main/java/org/apache/atlas/notification/rest/RestNotification.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.rest;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.notification.AtlasNotificationBaseMessage;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationException;
+import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+import static org.apache.atlas.kafka.KafkaNotification.ATLAS_ENTITIES_TOPIC;
+import static org.apache.atlas.kafka.KafkaNotification.ATLAS_HOOK_TOPIC;
+
+public class RestNotification extends AbstractNotification {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RestNotification.class);
+ private static final int BATCH_MAX_LENGTH_BYTES =
AtlasConfiguration.NOTIFICATION_REST_BODY_MAX_LENGTH_BYTES.getInt();
+ private static final String ATLAS_ENDPOINT =
"atlas.rest.address";
+ private static final String BASIC_AUTH_USERNAME =
"atlas.rest.basic.auth.username";
+ private static final String BASIC_AUTH_PASSWORD =
"atlas.rest.basic.auth.password";
+ private static final String DEFAULT_ATLAS_URL =
"http://localhost:31000/";
+
+ private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP =
new HashMap<NotificationType, String>() {
+ {
+ put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
+ put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
+ }
+ };
+
+ @VisibleForTesting
+ public AtlasClientV2 atlasClientV2;
+
+ public RestNotification(Configuration configuration) throws AtlasException
{
+ super();
+ setupAtlasClientV2(configuration);
+ }
+
+ private AtlasClientV2 setupAtlasClientV2(Configuration configuration)
throws AtlasException {
+ if (atlasClientV2 != null) {
+ return atlasClientV2;
+ }
+ try {
+ String[] atlasEndPoint =
configuration.getStringArray(ATLAS_ENDPOINT);
+
+ if (atlasEndPoint == null || atlasEndPoint.length == 0) {
+ atlasEndPoint = new String[] { DEFAULT_ATLAS_URL };
+ }
+
+ if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
+ String fileAuthUsername =
configuration.getString(BASIC_AUTH_USERNAME, "admin");
+ String fileAuthPassword =
configuration.getString(BASIC_AUTH_PASSWORD, "admin123");
+ String[] basicAuthUsernamePassword = (fileAuthUsername == null
|| fileAuthPassword == null)
+ ? AuthenticationUtil.getBasicAuthenticationInput()
+ : new String[]{fileAuthUsername, fileAuthPassword};
+
+ atlasClientV2 = new AtlasClientV2(atlasEndPoint,
basicAuthUsernamePassword);
+ } else {
+ atlasClientV2 = new AtlasClientV2(atlasEndPoint);
+ }
+ } catch (AtlasException e) {
+ throw new AtlasException(e);
+ }
+
+ return atlasClientV2;
+ }
+
+ @Override
+ public void sendInternal(NotificationType type, List<String> messages)
throws NotificationException {
+ String topic = PRODUCER_TOPIC_MAP.get(type);
+ List<List<String>> batches = getBatches(messages);
+
+ int batchCounter = 0;
+ try {
+ for (List<String> batch: batches) {
+ batchCounter++;
+ atlasClientV2.postNotificationToTopic(topic, batch);
+ }
+ } catch (AtlasServiceException e) {
+ if
(e.getMessage().contains(AtlasErrorCode.NOTIFICATION_EXCEPTION.getErrorCode()))
{
+ LOG.error("Sending notifications through REST interface failed
starting from batch# {}", batchCounter);
+ throw new NotificationException(e);
+ } else {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private List<List<String>> getBatches(List<String> messages) {
+ List<List<String>> batches = new ArrayList();
+ List<String> batch = new ArrayList();
+ int batchSize = 0;
+
+ for (String message : messages) {
+ byte[] msgBytes =
AtlasNotificationBaseMessage.getBytesUtf8(message);
+
+ if (batchSize > 0 && batchSize + msgBytes.length >
BATCH_MAX_LENGTH_BYTES) {
+ batches.add(batch);
+
+ batch = new ArrayList();
+ batchSize = 0;
+ }
+ batch.add(message);
+ batchSize += msgBytes.length;
+ }
+ batches.add(batch);
+ return batches;
+ }
+
+ @Override
+ public <T> List<NotificationConsumer<T>> createConsumers(NotificationType
notificationType, int numConsumers) {
+ return null;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public boolean isReady(NotificationType type) {
+ return true;
+ }
+}
diff --git
a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 1b486e528..aee59a395 100644
---
a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++
b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -229,6 +229,11 @@ public class AbstractNotificationConsumerTest {
public List<AtlasKafkaMessage<TestMessage>>
receiveWithCheckedCommit(Map<TopicPartition, Long>
lastCommittedPartitionOffset) {
return receive();
}
+
+ @Override
+ public List<AtlasKafkaMessage<TestMessage>>
receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long>
lastCommittedPartitionOffset) {
+ return null;
+ }
}
public static class TestMessageDeserializer extends
AbstractMessageDeserializer<TestMessage> {
diff --git
a/notification/src/test/java/org/apache/atlas/notification/RestNotificationTest.java
b/notification/src/test/java/org/apache/atlas/notification/RestNotificationTest.java
new file mode 100644
index 000000000..476518df6
--- /dev/null
+++
b/notification/src/test/java/org/apache/atlas/notification/RestNotificationTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasBaseClient;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.kafka.NotificationProvider;
+import org.apache.atlas.notification.rest.RestNotification;
+import org.apache.commons.configuration.Configuration;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.apache.atlas.kafka.KafkaNotification.ATLAS_HOOK_TOPIC;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+public class RestNotificationTest {
+
+ private NotificationInterface notifier;
+ private Configuration conf;
+
+ @Mock
+ private WebResource service;
+
+ @Mock
+ private WebResource.Builder resourceBuilderMock;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ conf = ApplicationProperties.get();
+
+
conf.setProperty(AtlasConfiguration.NOTIFICATION_HOOK_REST_ENABLED.getPropertyName(),
true);
+ conf.setProperty(NotificationProvider.CONF_ATLAS_HOOK_SPOOL_ENABLED,
false);
+
+ notifier = NotificationProvider.get();
+
+ }
+
+ private WebResource.Builder setupBuilder(AtlasClientV2.API api,
WebResource webResource) {
+ when(webResource.path(api.getPath())).thenReturn(service);
+ when(webResource.path(api.getNormalizedPath())).thenReturn(service);
+
+ return getBuilder(service);
+ }
+
+ private WebResource.Builder getBuilder(WebResource resourceObject) {
+
when(resourceObject.getRequestBuilder()).thenReturn(resourceBuilderMock);
+ when(resourceObject.path(anyString())).thenReturn(resourceObject);
+
when(resourceBuilderMock.accept(MediaType.APPLICATION_JSON)).thenReturn(resourceBuilderMock);
+
when(resourceBuilderMock.type(MediaType.MULTIPART_FORM_DATA)).thenReturn(resourceBuilderMock);
+ when(resourceBuilderMock.type(MediaType.APPLICATION_JSON + ";
charset=UTF-8")).thenReturn(resourceBuilderMock);
+
+ return resourceBuilderMock;
+ }
+
+ @Test
+ public void testNotificationProvider () throws Exception {
+ assertEquals(notifier.getClass(), RestNotification.class);
+ }
+
+ @Test
+ public void testPostNotificationToTopic () throws Exception {
+ AtlasClientV2 client = new AtlasClientV2(service, conf);
+ AtlasBaseClient.API api =
client.formatPathWithParameter(AtlasClientV2.API_V2.POST_NOTIFICATIONS_TO_TOPIC,
ATLAS_HOOK_TOPIC);
+ WebResource.Builder builder = setupBuilder(api, service);
+ ClientResponse response = mock(ClientResponse.class);
+
+
when(response.getStatus()).thenReturn(Response.Status.NO_CONTENT.getStatusCode());
+ when(builder.method(anyString(), Matchers.<Class>any(),
anyList())).thenReturn(response);
+
+ ((RestNotification)notifier).atlasClientV2 = client;
+
+ try {
+
((RestNotification)notifier).sendInternal(NotificationInterface.NotificationType.HOOK,
new ArrayList<String>(Arrays.asList("Dummy")));
+ } catch (NotificationException e) {
+ Assert.fail("Failed with Exception");
+ }
+ }
+
+ @Test
+ public void testNotificationException () throws Exception {
+ AtlasClientV2 client = new AtlasClientV2(service, conf);
+ AtlasBaseClient.API api =
client.formatPathWithParameter(AtlasClientV2.API_V2.POST_NOTIFICATIONS_TO_TOPIC,
ATLAS_HOOK_TOPIC);
+ WebResource.Builder builder = setupBuilder(api, service);
+ ClientResponse response = mock(ClientResponse.class);
+
+
when(response.getStatus()).thenReturn(AtlasErrorCode.NOTIFICATION_EXCEPTION.getHttpCode().getStatusCode());
+
when(response.getEntity(String.class)).thenReturn(AtlasErrorCode.NOTIFICATION_EXCEPTION.getErrorCode());
+ when(builder.method(anyString(), Matchers.<Class>any(),
anyList())).thenReturn(response);
+
+ ((RestNotification)notifier).atlasClientV2 = client;
+
+ try {
+
((RestNotification)notifier).sendInternal(NotificationInterface.NotificationType.HOOK,
new ArrayList<String>(Arrays.asList("Dummy")));
+ } catch (NotificationException e) {
+
Assert.assertTrue(e.getMessage().contains(AtlasErrorCode.NOTIFICATION_EXCEPTION.getErrorCode()));
+ }
+ }
+
+}
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 1cdfcef8a..936423aa2 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -22,7 +22,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.*;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
@@ -83,6 +85,7 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -91,6 +94,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -123,6 +127,9 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
private static final String
EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION = "PermanentLockingException";
private static final int KAFKA_CONSUMER_SHUTDOWN_WAIT = 30000;
+ private static final String ATLAS_HOOK_CONSUMER_THREAD_NAME =
"atlas-hook-consumer-thread";
+ private static final String ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME =
"atlas-hook-unsorted-consumer-thread";
+
// from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
public static final String DUMMY_DATABASE =
"_dummy_database";
public static final String DUMMY_TABLE = "_dummy_table";
@@ -195,6 +202,8 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
private Instant nextStatsLogTime =
AtlasMetricsCounter.getNextHourStartTime(Instant.now());
private final Map<TopicPartition, Long> lastCommittedPartitionOffset;
private final EntityCorrelationManager entityCorrelationManager;
+ private final long consumerMsgBufferingIntervalMS;
+ private final int consumerMsgBufferingBatchSize;
@VisibleForTesting
final int consumerRetryInterval;
@@ -230,6 +239,8 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
largeMessageProcessingTimeThresholdMs =
applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms",
60 * 1000); // 60 sec by default
createShellEntityForNonExistingReference =
AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
authorizeUsingMessageUser =
applicationProperties.getBoolean(CONSUMER_AUTHORIZE_USING_MESSAGE_USER, false);
+ consumerMsgBufferingIntervalMS =
AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_BUFFERING_INTERVAL.getInt() *
1000;
+ consumerMsgBufferingBatchSize =
AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_BUFFERING_BATCH_SIZE.getInt();
int authnCacheTtlSeconds =
applicationProperties.getInt(CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS, 300);
@@ -350,17 +361,35 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
}
private void startConsumers(ExecutorService executorService) {
- int numThreads =
applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
+ int
numThreads =
applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
+ Map<NotificationConsumer<HookNotification>, NotificationType>
notificationConsumersByType = new HashMap<>();
+
List<NotificationConsumer<HookNotification>> notificationConsumers =
notificationInterface.createConsumers(NotificationType.HOOK, numThreads);
+ for (NotificationConsumer<HookNotification> notificationConsumer :
notificationConsumers) {
+ notificationConsumersByType.put(notificationConsumer,
NotificationType.HOOK);
+ }
+
+ if (AtlasHook.isHookMsgsSortEnabled) {
+ List<NotificationConsumer<HookNotification>>
unsortedNotificationConsumers =
notificationInterface.createConsumers(NotificationType.HOOK_UNSORTED,
numThreads);
+ for (NotificationConsumer<HookNotification>
unsortedNotificationConsumer : unsortedNotificationConsumers) {
+ notificationConsumersByType.put(unsortedNotificationConsumer,
NotificationType.HOOK_UNSORTED);
+ }
+ }
if (executorService == null) {
- executorService =
Executors.newFixedThreadPool(notificationConsumers.size(), new
ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
+ executorService =
Executors.newFixedThreadPool(notificationConsumersByType.size(), new
ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
}
executors = executorService;
- for (final NotificationConsumer<HookNotification> consumer :
notificationConsumers) {
- HookConsumer hookConsumer = new HookConsumer(consumer);
+ for (final NotificationConsumer<HookNotification> consumer :
notificationConsumersByType.keySet()) {
+ String hookConsumerName = ATLAS_HOOK_CONSUMER_THREAD_NAME;
+
+ if
(notificationConsumersByType.get(consumer).equals(NotificationType.HOOK_UNSORTED))
{
+ hookConsumerName = ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME;
+ }
+
+ HookConsumer hookConsumer = new HookConsumer(hookConsumerName,
consumer);
consumers.add(hookConsumer);
executors.submit(hookConsumer);
@@ -529,8 +558,16 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
private final List<String> failedMessages =
new ArrayList<>();
private final AdaptiveWaiter adaptiveWaiter =
new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
+ private int duplicateKeyCounter = 1;
+
public HookConsumer(NotificationConsumer<HookNotification> consumer) {
- super("atlas-hook-consumer-thread");
+ super(ATLAS_HOOK_CONSUMER_THREAD_NAME);
+
+ this.consumer = consumer;
+ }
+
+ public HookConsumer(String consumerThreadName,
NotificationConsumer<HookNotification> consumer) {
+ super(consumerThreadName);
this.consumer = consumer;
}
@@ -548,10 +585,15 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
try {
while (shouldRun.get()) {
try {
- List<AtlasKafkaMessage<HookNotification>> messages =
consumer.receiveWithCheckedCommit(lastCommittedPartitionOffset);
-
- for (AtlasKafkaMessage<HookNotification> msg :
messages) {
- handleMessage(msg);
+ if
(StringUtils.equals(ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME, this.getName())) {
+ long
msgBufferingStartTime = System.currentTimeMillis();
+ Map<String,AtlasKafkaMessage<HookNotification>>
msgBuffer = new TreeMap<>();
+
sortAndPublishMsgsToAtlasHook(msgBufferingStartTime, msgBuffer);
+ } else {
+ List<AtlasKafkaMessage<HookNotification>> messages
= consumer.receiveWithCheckedCommit(lastCommittedPartitionOffset);
+ for (AtlasKafkaMessage<HookNotification> msg :
messages) {
+ handleMessage(msg);
+ }
}
} catch (IllegalStateException ex) {
adaptiveWaiter.pause(ex);
@@ -576,6 +618,63 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
}
}
+ private void resetDuplicateKeyCounter() {
+ duplicateKeyCounter = 1;
+ }
+
+ private String getKey(String msgCreated, String source) {
+ return String.format("%s_%s", msgCreated, source);
+ }
+
+ private void sortMessages(AtlasKafkaMessage<HookNotification> msg,
Map<String,AtlasKafkaMessage<HookNotification>> msgBuffer) {
+ String key = getKey(Long.toString(msg.getMsgCreated()),
msg.getSource());
+ if (msgBuffer.containsKey(key)) { //Duplicate key can possible for
messages from same source with same msgCreationTime
+ key = getKey(key, Integer.toString(duplicateKeyCounter));
+ duplicateKeyCounter++;
+ }
+ msgBuffer.put(key, msg);
+ }
+
+ void sortAndPublishMsgsToAtlasHook(long msgBufferingStartTime,
Map<String,AtlasKafkaMessage<HookNotification>> msgBuffer) throws
NotificationException {
+
+ List<AtlasKafkaMessage<HookNotification>> messages =
consumer.receiveRawRecordsWithCheckedCommit(lastCommittedPartitionOffset);
+ AtlasKafkaMessage<HookNotification> maxOffsetMsg = null;
+ long maxOffsetProcessed = 0;
+
+ messages.stream().forEach(x -> sortMessages(x, msgBuffer));
+
+ if (msgBuffer.size() < consumerMsgBufferingBatchSize &&
System.currentTimeMillis() - msgBufferingStartTime <
consumerMsgBufferingIntervalMS) {
+ sortAndPublishMsgsToAtlasHook(msgBufferingStartTime,
msgBuffer);
+ return;
+ }
+
+ for (AtlasKafkaMessage<HookNotification> msg : msgBuffer.values())
{
+ String hookTopic = StringUtils.isNotEmpty(msg.getTopic()) ?
msg.getTopic().split(KafkaNotification.UNSORTED_POSTFIX)[0] :
KafkaNotification.ATLAS_HOOK_TOPIC;
+ if (maxOffsetProcessed == 0 || maxOffsetProcessed <
msg.getOffset()) {
+ maxOffsetMsg = msg;
+ maxOffsetProcessed = msg.getOffset();
+ }
+
+
((KafkaNotification)notificationInterface).sendInternal(hookTopic,
+ StringUtils.isNotEmpty(msg.getRawRecordData()) ?
Arrays.asList(msg.getRawRecordData()) :
Arrays.asList(msg.getMessage().toString()));
+ }
+
+
+ /** In case of failure while publishing sorted messages(above for
loop), consuming of unsorted messages should start from the initial offset
+ * Explicitly keeping this for loop separate to commit messages
only after sending all batch messages to hook topic
+ */
+ for (AtlasKafkaMessage<HookNotification> msg : msgBuffer.values())
{
+ commit(msg);
+ }
+
+ if (maxOffsetMsg != null) {
+ commit(maxOffsetMsg);
+ }
+
+ msgBuffer.clear();
+ resetDuplicateKeyCounter();
+ }
+
@VisibleForTesting
void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
AtlasPerfTracer perf = null;
diff --git
a/webapp/src/main/java/org/apache/atlas/web/rest/NotificationREST.java
b/webapp/src/main/java/org/apache/atlas/web/rest/NotificationREST.java
new file mode 100644
index 000000000..295579e46
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/NotificationREST.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.rest;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.authorize.AtlasAdminAccessRequest;
+import org.apache.atlas.authorize.AtlasAuthorizationUtils;
+import org.apache.atlas.authorize.AtlasPrivilege;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.kafka.KafkaNotification;
+import org.apache.atlas.notification.NotificationException;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.utils.AtlasJson;
+import org.apache.atlas.web.util.Servlets;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.*;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.*;
+
+@Path("v2/notification")
+@Singleton
+@Service
+@Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
+@Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
+public class NotificationREST {
+ private static final Logger LOG
= LoggerFactory.getLogger(NotificationREST.class);
+ public static final String ATLAS_HOOK_TOPIC
= AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
+ public static final String ATLAS_ENTITIES_TOPIC
= AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
+ private static final String[]
ATLAS_HOOK_CONSUMER_TOPICS =
AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
+ private static final String[]
ATLAS_ENTITIES_CONSUMER_TOPICS =
AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
+ private static final Set<String> TOPICS
= new HashSet<>();
+
+ private final NotificationInterface notificationInterface;
+
+ static {
+ TOPICS.addAll(Arrays.asList(ATLAS_HOOK_CONSUMER_TOPICS));
+ TOPICS.addAll(Arrays.asList(ATLAS_ENTITIES_CONSUMER_TOPICS));
+ }
+
+ @Inject
+ public NotificationREST(NotificationInterface notificationInterface) {
+ this.notificationInterface = notificationInterface;
+ }
+
+ /**
+ * Publish notifications on Kafka topic
+ *
+ * @param topicName - nameOfTheQueue
+ * @throws AtlasBaseException
+ */
+ @POST
+ @Path("/topic/{topicName}")
+ @Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
+ public void handleNotifications(@PathParam("topicName") String topicName,
@Context HttpServletRequest request) throws AtlasBaseException, IOException {
+ LOG.debug("Handling notifications for topic {}", topicName);
+ AtlasAuthorizationUtils.verifyAccess(new
AtlasAdminAccessRequest(AtlasPrivilege.SERVICE_NOTIFICATION_POST), "post on
rest notification service");
+
+ if (!TOPICS.contains(topicName)) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_TOPIC_NAME,
topicName);
+ }
+
+ String messagesAsJson = Servlets.getRequestPayload(request);
+ List<String> messages = getMessagesToNotify(messagesAsJson);
+
+ try {
+ KafkaNotification notifier = (KafkaNotification)
notificationInterface;
+ notifier.sendInternal(topicName, messages,
AtlasHook.isHookMsgsSortEnabled);
+
+ } catch (NotificationException exception) {
+ List<String> failedMessages = exception.getFailedMessages();
+ String concatenatedMessage =
StringUtils.join(failedMessages, "\n");
+
+ throw new
AtlasBaseException(AtlasErrorCode.NOTIFICATION_EXCEPTION, exception,
concatenatedMessage);
+ }
+
+ }
+
+ private List<String> getMessagesToNotify(String messagesAsJson) {
+ List<String> messages = new ArrayList<>();
+
+ try {
+ ArrayNode messageNodes =
AtlasJson.parseToV1ArrayNode(messagesAsJson);
+ for (JsonNode messageNode : messageNodes) {
+ messages.add(AtlasJson.toV1Json(messageNode));
+ }
+ } catch (IOException e) {
+ messages.add(messagesAsJson);
+ }
+
+ return messages;
+ }
+
+}
\ No newline at end of file
diff --git
a/webapp/src/test/java/org/apache/atlas/web/integration/NotificationRestIT.java
b/webapp/src/test/java/org/apache/atlas/web/integration/NotificationRestIT.java
new file mode 100644
index 000000000..2c907598d
--- /dev/null
+++
b/webapp/src/test/java/org/apache/atlas/web/integration/NotificationRestIT.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.web.integration;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.utils.TestResourceFileUtils;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+
+import static org.apache.atlas.kafka.KafkaNotification.ATLAS_HOOK_TOPIC;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+public class NotificationRestIT extends BaseResourceIT {
+
+ @Test
+ public void unAuthPostNotification() throws IOException {
+ AtlasClientV2 unAuthClient = new AtlasClientV2(atlasUrls, new
String[]{"admin", "wr0ng_pa55w0rd"});
+
+ try {
+ unAuthClient.postNotificationToTopic(ATLAS_HOOK_TOPIC, new
ArrayList<String>(Arrays.asList("Dummy")));
+ } catch(AtlasServiceException e) {
+ assertNotNull(e.getStatus(), "expected server error code in the
status");
+ }
+ }
+
+ @Test
+ public void postNotificationBasicTest() throws Exception {
+ String db_name = "db_" + randomString();
+ String cluster_name = "cl" + randomString();
+ String qualifiedName = db_name + "@" + cluster_name;
+ String notificationString =
TestResourceFileUtils.getJson("notifications/create-db")
+ .replaceAll("--name--", db_name).replaceAll("--clName--",
cluster_name)
+ .replace("\"--ts--\"", String.valueOf((new Date()).getTime()));
+
+ try {
+ atlasClientV2.postNotificationToTopic(ATLAS_HOOK_TOPIC, new
ArrayList<String>(Arrays.asList(notificationString)));
+
+ waitFor(MAX_WAIT_TIME, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ ArrayNode results = searchByDSL(String.format("%s where
qualifiedName='%s'", DATABASE_TYPE_BUILTIN, qualifiedName));
+
+ return results.size() == 1;
+ }
+ });
+ } catch(AtlasServiceException e) {
+ assertNull(e.getStatus(), "expected no server error code in the
status");
+ }
+
+ }
+}
diff --git a/webapp/src/test/resources/json/notifications/create-db-ddl.json
b/webapp/src/test/resources/json/notifications/create-db-ddl.json
new file mode 100644
index 000000000..443495dde
--- /dev/null
+++ b/webapp/src/test/resources/json/notifications/create-db-ddl.json
@@ -0,0 +1 @@
+{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"172.27.100.78","msgCreatedBy":"hive","msgCreationTime":"--ts--","spooled":false,"message":{"type":"ENTITY_CREATE_V2","user":"root","entities":{"entities":[{"typeName":"hive_db_ddl","attributes":{"serviceType":"hive","qualifiedName":"--name--@--clName--:--execTime--","execTime":"--execTime--","queryText":"create
database --name--","name":"create database --name--" [...]
\ No newline at end of file
diff --git a/webapp/src/test/resources/json/notifications/create-db.json
b/webapp/src/test/resources/json/notifications/create-db.json
new file mode 100644
index 000000000..8df0e4dcc
--- /dev/null
+++ b/webapp/src/test/resources/json/notifications/create-db.json
@@ -0,0 +1 @@
+{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"172.27.10.4","msgCreatedBy":"hive","msgCreationTime":"--ts--","message":{"type":"ENTITY_CREATE_V2","user":"hive","entities":{"referredEntities":{},"entities":[{"typeName":"hive_db","attributes":{"owner":"admin","ownerType":"USER","managedLocation":null,"qualifiedName":"--name--@--clName--","clusterName":"--clName--","name":"--name--","location":"some_location","p
[...]
\ No newline at end of file
diff --git a/webapp/src/test/resources/json/notifications/delete-db.json
b/webapp/src/test/resources/json/notifications/delete-db.json
new file mode 100644
index 000000000..26e82e917
--- /dev/null
+++ b/webapp/src/test/resources/json/notifications/delete-db.json
@@ -0,0 +1 @@
+{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"172.27.72.140","msgCreatedBy":"hive","msgCreationTime":"--ts--","spooled":false,"message":{"type":"ENTITY_DELETE_V2","user":"hive","entities":[{"typeName":"hive_db","uniqueAttributes":{"qualifiedName":"--name--@--clName--"}}]}}
\ No newline at end of file