Repository: atlas Updated Branches: refs/heads/master 3af543640 -> d541a3786
ATLAS-2075: notification enhancement to handle large messages, using compression and multi-part messages (cherry picked from commit 99243ee8e18656acd72601468c99e7781a0b04f7) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/d541a378 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/d541a378 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/d541a378 Branch: refs/heads/master Commit: d541a378678c1fc2b9640a65d38f5d9bec20cf14 Parents: 3af5436 Author: Madhan Neethiraj <[email protected]> Authored: Wed Sep 27 20:42:25 2017 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Sat Sep 30 04:57:34 2017 -0700 ---------------------------------------------------------------------- .../org/apache/atlas/AtlasConfiguration.java | 7 + .../apache/atlas/kafka/AtlasKafkaConsumer.java | 4 + .../apache/atlas/kafka/KafkaNotification.java | 4 +- .../AbstractMessageDeserializer.java | 14 +- .../notification/AbstractNotification.java | 113 +++++++++- .../AtlasNotificationBaseMessage.java | 194 ++++++++++++++++ .../notification/AtlasNotificationMessage.java | 50 +++++ .../AtlasNotificationMessageDeserializer.java | 225 +++++++++++++++++++ .../AtlasNotificationStringMessage.java | 60 +++++ .../atlas/notification/MessageVersion.java | 3 + .../notification/NotificationInterface.java | 4 +- .../atlas/notification/VersionedMessage.java | 75 ------- .../VersionedMessageDeserializer.java | 105 --------- .../apache/atlas/kafka/KafkaConsumerTest.java | 11 +- .../atlas/kafka/KafkaNotificationMockTest.java | 18 +- .../AbstractNotificationConsumerTest.java | 41 ++-- .../notification/AbstractNotificationTest.java | 33 +-- .../AtlasNotificationMessageTest.java | 57 +++++ .../notification/VersionedMessageTest.java | 57 ----- .../entity/EntityMessageDeserializerTest.java | 16 +- .../hook/HookMessageDeserializerTest.java | 134 +++++++++-- 21 files changed, 895 insertions(+), 330 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/common/src/main/java/org/apache/atlas/AtlasConfiguration.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java index 9a9bb76..451bd9d 100644 --- a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -33,6 +33,9 @@ public enum AtlasConfiguration { QUERY_PARAM_MAX_LENGTH("atlas.query.param.max.length", 4*1024), + NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)), + NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true), + //search configuration SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000), SEARCH_DEFAULT_LIMIT("atlas.search.defaultlimit", 100); @@ -63,6 +66,10 @@ public enum AtlasConfiguration { return APPLICATION_PROPERTIES.getLong(propertyName, Long.valueOf(defaultValue.toString()).longValue()); } + public boolean getBoolean() { + return APPLICATION_PROPERTIES.getBoolean(propertyName, Boolean.valueOf(defaultValue.toString()).booleanValue()); + } + public String getString() { return APPLICATION_PROPERTIES.getString(propertyName, defaultValue.toString()); } http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java index d3b4e49..e3bb71c 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java @@ -71,6 +71,10 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { T message = deserializer.deserialize(record.value().toString()); + if (message == null) { + continue; + } + messages.add(new AtlasKafkaMessage(message, record.offset(), record.partition())); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 38889ef..6bb8d73 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -202,7 +202,7 @@ public class KafkaNotification extends AbstractNotification implements Service { // ----- AbstractNotification -------------------------------------------- @Override - public void sendInternal(NotificationType type, String... messages) throws NotificationException { + public void sendInternal(NotificationType type, List<String> messages) throws NotificationException { if (producer == null) { createProducer(); } @@ -210,7 +210,7 @@ public class KafkaNotification extends AbstractNotification implements Service { } @VisibleForTesting - void sendInternalToProducer(Producer p, NotificationType type, String[] messages) throws NotificationException { + void sendInternalToProducer(Producer p, NotificationType type, List<String> messages) throws NotificationException { String topic = TOPIC_MAP.get(type); List<MessageContext> messageContexts = new ArrayList<>(); for (String message : messages) { http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java index ec99372..37a57d1 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java @@ -44,7 +44,7 @@ import java.util.Map; /** * Base notification message deserializer. */ -public abstract class AbstractMessageDeserializer<T> extends VersionedMessageDeserializer<T> { +public abstract class AbstractMessageDeserializer<T> extends AtlasNotificationMessageDeserializer<T> { private static final Map<Type, JsonDeserializer> DESERIALIZER_MAP = new HashMap<>(); @@ -63,16 +63,16 @@ public abstract class AbstractMessageDeserializer<T> extends VersionedMessageDes /** * Create a deserializer. * - * @param versionedMessageType the type of the versioned message - * @param expectedVersion the expected message version - * @param deserializerMap map of individual deserializers used to define this message deserializer - * @param notificationLogger logger for message version mismatch + * @param notificationMessageType the type of the notification message + * @param expectedVersion the expected message version + * @param deserializerMap map of individual deserializers used to define this message deserializer + * @param notificationLogger logger for message version mismatch */ - public AbstractMessageDeserializer(Type versionedMessageType, + public AbstractMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion, Map<Type, JsonDeserializer> deserializerMap, Logger notificationLogger) { - super(versionedMessageType, expectedVersion, getDeserializer(deserializerMap), notificationLogger); + super(notificationMessageType, expectedVersion, getDeserializer(deserializerMap), notificationLogger); } http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java index cb44fc6..1f9404d 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java @@ -26,21 +26,34 @@ import com.google.gson.JsonSerializationContext; import com.google.gson.JsonSerializer; import org.apache.atlas.AtlasException; import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind; import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.commons.configuration.Configuration; import org.codehaus.jettison.json.JSONArray; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.lang.reflect.Type; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_COMPRESSION_ENABLED; +import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_MAX_LENGTH_BYTES; /** * Abstract notification interface implementation. */ public abstract class AbstractNotification implements NotificationInterface { + private static final Logger LOG = LoggerFactory.getLogger(AbstractNotification.class); + + private static String msgIdPrefix = UUID.randomUUID().toString(); + private static AtomicInteger msgIdSuffix = new AtomicInteger(0); /** * The current expected version for notification messages. @@ -48,6 +61,9 @@ public abstract class AbstractNotification implements NotificationInterface { public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0"); public static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded"; + + public static final int MAX_BYTES_PER_CHAR = 4; // each char can encode upto 4 bytes in UTF-8 + private final boolean embedded; private final boolean isHAEnabled; @@ -77,10 +93,12 @@ public abstract class AbstractNotification implements NotificationInterface { @Override public <T> void send(NotificationType type, List<T> messages) throws NotificationException { - String[] strMessages = new String[messages.size()]; + List<String> strMessages = new ArrayList<>(messages.size()); + for (int index = 0; index < messages.size(); index++) { - strMessages[index] = getMessageJson(messages.get(index)); + createNotificationMessages(messages.get(index), strMessages); } + sendInternal(type, strMessages); } @@ -117,11 +135,17 @@ public abstract class AbstractNotification implements NotificationInterface { * * @throws NotificationException if an error occurs while sending */ - protected abstract void sendInternal(NotificationType type, String[] messages) throws NotificationException; + protected abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException; // ----- utility methods ------------------------------------------------- + public static String getMessageJson(Object message) { + AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message); + + return GSON.toJson(notificationMsg); + } + /** * Get the notification message JSON from the given object. * @@ -129,10 +153,75 @@ public abstract class AbstractNotification implements NotificationInterface { * * @return the message as a JSON string */ - public static String getMessageJson(Object message) { - VersionedMessage<?> versionedMessage = new VersionedMessage<>(CURRENT_MESSAGE_VERSION, message); + public static void createNotificationMessages(Object message, List<String> msgJsonList) { + AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message); + String msgJson = GSON.toJson(notificationMsg); + + boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES; + + if (msgLengthExceedsLimit) { // get utf-8 bytes for msgJson and check for length limit again + byte[] msgBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson); + + msgLengthExceedsLimit = msgBytes.length > MESSAGE_MAX_LENGTH_BYTES; + + if (msgLengthExceedsLimit) { + String msgId = getNextMessageId(); + CompressionKind compressionKind = CompressionKind.NONE; + + if (MESSAGE_COMPRESSION_ENABLED) { + byte[] encodedBytes = AtlasNotificationBaseMessage.gzipCompressAndEncodeBase64(msgBytes); + + compressionKind = CompressionKind.GZIP; + + LOG.info("Compressed large message: msgID={}, uncompressed={} bytes, compressed={} bytes", msgId, msgBytes.length, encodedBytes.length); + + msgLengthExceedsLimit = encodedBytes.length > MESSAGE_MAX_LENGTH_BYTES; - return GSON.toJson(versionedMessage); + if (!msgLengthExceedsLimit) { // no need to split + AtlasNotificationStringMessage compressedMsg = new AtlasNotificationStringMessage(encodedBytes, msgId, compressionKind); + + msgJson = GSON.toJson(compressedMsg); // msgJson will not have multi-byte characters here, due to use of encodeBase64() above + msgBytes = null; // not used after this point + } else { // encodedBytes will be split + msgJson = null; // not used after this point + msgBytes = encodedBytes; + } + } + + if (msgLengthExceedsLimit) { + // compressed messages are already base64-encoded + byte[] encodedBytes = compressionKind != CompressionKind.NONE ? msgBytes : AtlasNotificationBaseMessage.encodeBase64(msgBytes); + + int splitCount = encodedBytes.length / MESSAGE_MAX_LENGTH_BYTES; + + if ((encodedBytes.length % MESSAGE_MAX_LENGTH_BYTES) != 0) { + splitCount++; + } + + LOG.info("Splitting large message: msgID={}, length={} bytes, splitCount={}", msgId, encodedBytes.length, splitCount); + + for (int i = 0, offset = 0; i < splitCount; i++) { + int length = MESSAGE_MAX_LENGTH_BYTES; + + if ((offset + length) > encodedBytes.length) { + length = encodedBytes.length - offset; + } + + AtlasNotificationStringMessage splitMsg = new AtlasNotificationStringMessage(encodedBytes, offset, length, msgId, compressionKind, i, splitCount); + + String splitMsgJson = GSON.toJson(splitMsg); + + msgJsonList.add(splitMsgJson); + + offset += length; + } + } + } + } + + if (!msgLengthExceedsLimit) { + msgJsonList.add(msgJson); + } } @@ -158,4 +247,16 @@ public abstract class AbstractNotification implements NotificationInterface { return new JsonParser().parse(src.toString()).getAsJsonArray(); } } + + private static String getNextMessageId() { + String nextMsgIdPrefix = msgIdPrefix; + int nextMsgIdSuffix = msgIdSuffix.getAndIncrement(); + + if (nextMsgIdSuffix == Short.MAX_VALUE) { // get a new UUID after 32,767 IDs + msgIdPrefix = UUID.randomUUID().toString(); + msgIdSuffix = new AtomicInteger(0); + } + + return nextMsgIdPrefix + "_" + Integer.toString(nextMsgIdSuffix); + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java new file mode 100644 index 0000000..3b377de --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + + +import org.apache.atlas.AtlasConfiguration; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.codec.binary.StringUtils; +import org.apache.commons.compress.utils.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + + +public class AtlasNotificationBaseMessage { + private static final Logger LOG = LoggerFactory.getLogger(AtlasNotificationBaseMessage.class); + + public static final int MESSAGE_MAX_LENGTH_BYTES = AtlasConfiguration.NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES.getInt() - 512; // 512 bytes for envelop; + public static final boolean MESSAGE_COMPRESSION_ENABLED = AtlasConfiguration.NOTIFICATION_MESSAGE_COMPRESSION_ENABLED.getBoolean(); + + public enum CompressionKind { NONE, GZIP }; + + private MessageVersion version = null; + private String msgId = null; + private CompressionKind msgCompressionKind = CompressionKind.NONE; + private int msgSplitIdx = 1; + private int msgSplitCount = 1; + + + public AtlasNotificationBaseMessage() { + } + + public AtlasNotificationBaseMessage(MessageVersion version) { + this(version, null, CompressionKind.NONE); + } + + public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind) { + this.version = version; + this.msgId = msgId; + this.msgCompressionKind = msgCompressionKind; + } + + public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind, int msgSplitIdx, int msgSplitCount) { + this.version = version; + this.msgId = msgId; + this.msgCompressionKind = msgCompressionKind; + this.msgSplitIdx = msgSplitIdx; + this.msgSplitCount = msgSplitCount; + } + + public void setVersion(MessageVersion version) { + this.version = version; + } + + public MessageVersion getVersion() { + return version; + } + + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + public CompressionKind getMsgCompressionKind() { + return msgCompressionKind; + } + + public void setMsgCompressed(CompressionKind msgCompressionKind) { + this.msgCompressionKind = msgCompressionKind; + } + + public int getMsgSplitIdx() { + return msgSplitIdx; + } + + public void setMsgSplitIdx(int msgSplitIdx) { + this.msgSplitIdx = msgSplitIdx; + } + + public int getMsgSplitCount() { + return msgSplitCount; + } + + public void setMsgSplitCount(int msgSplitCount) { + this.msgSplitCount = msgSplitCount; + } + + /** + * Compare the version of this message with the given version. + * + * @param compareToVersion the version to compare to + * + * @return a negative integer, zero, or a positive integer as this message's version is less than, equal to, + * or greater than the given version. + */ + public int compareVersion(MessageVersion compareToVersion) { + return version.compareTo(compareToVersion); + } + + + public static byte[] getBytesUtf8(String str) { + return StringUtils.getBytesUtf8(str); + } + + public static String getStringUtf8(byte[] bytes) { + return StringUtils.newStringUtf8(bytes); + } + + public static byte[] encodeBase64(byte[] bytes) { + return Base64.encodeBase64(bytes); + } + + public static byte[] decodeBase64(byte[] bytes) { + return Base64.decodeBase64(bytes); + } + + public static byte[] gzipCompressAndEncodeBase64(byte[] bytes) { + return encodeBase64(gzipCompress(bytes)); + } + + public static byte[] decodeBase64AndGzipUncompress(byte[] bytes) { + return gzipUncompress(decodeBase64(bytes)); + } + + public static String gzipCompress(String str) { + byte[] bytes = getBytesUtf8(str); + byte[] compressedBytes = gzipCompress(bytes); + byte[] encodedBytes = encodeBase64(compressedBytes); + + return getStringUtf8(encodedBytes); + } + + public static String gzipUncompress(String str) { + byte[] encodedBytes = getBytesUtf8(str); + byte[] compressedBytes = decodeBase64(encodedBytes); + byte[] bytes = gzipUncompress(compressedBytes); + + return getStringUtf8(bytes); + } + + public static byte[] gzipCompress(byte[] content) { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + + try { + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream); + + gzipOutputStream.write(content); + gzipOutputStream.close(); + } catch (IOException e) { + LOG.error("gzipCompress(): error compressing {} bytes", content.length, e); + + throw new RuntimeException(e); + } + + return byteArrayOutputStream.toByteArray(); + } + + public static byte[] gzipUncompress(byte[] content) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + try { + IOUtils.copy(new GZIPInputStream(new ByteArrayInputStream(content)), out); + } catch (IOException e) { + LOG.error("gzipUncompress(): error uncompressing {} bytes", content.length, e); + } + + return out.toByteArray(); + } +} + http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java new file mode 100644 index 0000000..2f6f9c7 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +/** + * Represents a notification message that is associated with a version. + */ +public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage { + + /** + * The actual message. + */ + private final T message; + + + // ----- Constructors ---------------------------------------------------- + + /** + * Create a notification message. + * + * @param version the message version + * @param message the actual message + */ + public AtlasNotificationMessage(MessageVersion version, T message) { + super(version); + + this.message = message; + } + + + public T getMessage() { + return message; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java new file mode 100644 index 0000000..b1ac2fa --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import com.google.gson.Gson; +import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.Map; + +/** + * Deserializer that works with notification messages. The version of each deserialized message is checked against an + * expected version. + */ +public abstract class AtlasNotificationMessageDeserializer<T> implements MessageDeserializer<T> { + private static final Logger LOG = LoggerFactory.getLogger(AtlasNotificationMessageDeserializer.class); + + + public static final String VERSION_MISMATCH_MSG = + "Notification message version mismatch. Expected %s but recieved %s. Message %s"; + + private final Type notificationMessageType; + private final Type messageType; + private final MessageVersion expectedVersion; + private final Logger notificationLogger; + private final Gson gson; + + + private final Map<String, AtlasNotificationStringMessage[]> splitMsgBuffer = new HashMap<>(); + + // ----- Constructors ---------------------------------------------------- + + /** + * Create a notification message deserializer. + * + * @param notificationMessageType the type of the notification message + * @param expectedVersion the expected message version + * @param gson JSON serialization/deserialization + * @param notificationLogger logger for message version mismatch + */ + public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion, + Gson gson, Logger notificationLogger) { + this.notificationMessageType = notificationMessageType; + this.messageType = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0]; + this.expectedVersion = expectedVersion; + this.gson = gson; + this.notificationLogger = notificationLogger; + } + + // ----- MessageDeserializer --------------------------------------------- + + @Override + public T deserialize(String messageJson) { + final T ret; + + AtlasNotificationBaseMessage msg = gson.fromJson(messageJson, AtlasNotificationBaseMessage.class); + + if (msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage + ret = gson.fromJson(messageJson, messageType); + } else { + String msgJson = messageJson; + + if (msg.getMsgSplitCount() > 1) { // multi-part message + AtlasNotificationStringMessage splitMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class); + + checkVersion(splitMsg, msgJson); + + String msgId = splitMsg.getMsgId(); + + if (StringUtils.isEmpty(msgId)) { + LOG.error("Received multi-part message with no message ID. Ignoring message"); + + msg = null; + } else { + final int splitIdx = splitMsg.getMsgSplitIdx(); + final int splitCount = splitMsg.getMsgSplitCount(); + + final AtlasNotificationStringMessage[] splitMsgs; + + if (splitIdx == 0) { + splitMsgs = new AtlasNotificationStringMessage[splitCount]; + + splitMsgBuffer.put(msgId, splitMsgs); + } else { + splitMsgs = splitMsgBuffer.get(msgId); + } + + if (splitMsgs == null) { + LOG.error("Received multi-part message: msgID={}, {} of {}, but first message didn't arrive. Ignoring message", msgId, splitIdx + 1, splitCount); + + msg = null; + } else if (splitMsgs.length <= splitIdx) { + LOG.error("Received multi-part message: msgID={}, {} of {} - out of bounds. Ignoring message", msgId, splitIdx + 1, splitCount); + + msg = null; + } else { + LOG.info("Received multi-part message: msgID={}, {} of {}", msgId, splitIdx + 1, splitCount); + + splitMsgs[splitIdx] = splitMsg; + + if (splitIdx == (splitCount - 1)) { // last message + splitMsgBuffer.remove(msgId); + + boolean isValidMessage = true; + + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < splitMsgs.length; i++) { + splitMsg = splitMsgs[i]; + + if (splitMsg == null) { + LOG.warn("Multi-part message: msgID={}, message {} of {} is missing. Ignoring message", msgId, i + 1, splitCount); + + isValidMessage = false; + + break; + } + + sb.append(splitMsg.getMessage()); + } + + if (isValidMessage) { + msgJson = sb.toString(); + + if (CompressionKind.GZIP.equals(splitMsg.getMsgCompressionKind())) { + byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson); + byte[] bytes = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes); + + msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes); + + LOG.info("Received multi-part, compressed message: msgID={}, compressed={} bytes, uncompressed={} bytes", msgId, encodedBytes.length, bytes.length); + } else { + byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson); + byte[] bytes = AtlasNotificationBaseMessage.decodeBase64(encodedBytes); + + msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes); + + LOG.info("Received multi-part message: msgID={}, compressed={} bytes, uncompressed={} bytes", msgId, encodedBytes.length, bytes.length); + } + + msg = gson.fromJson(msgJson, AtlasNotificationBaseMessage.class); + } else { + msg = null; + } + } else { // more messages to arrive + msg = null; + } + } + } + } + + if (msg != null) { + if (CompressionKind.GZIP.equals(msg.getMsgCompressionKind())) { + AtlasNotificationStringMessage compressedMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class); + + byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(compressedMsg.getMessage()); + byte[] bytes = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes); + + msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes); + + LOG.info("Received compressed message: msgID={}, compressed={} bytes, uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, bytes.length); + } + + AtlasNotificationMessage<T> atlasNotificationMessage = gson.fromJson(msgJson, notificationMessageType); + + checkVersion(atlasNotificationMessage, msgJson); + + ret = atlasNotificationMessage.getMessage(); + } else { + ret = null; + } + } + + return ret; + } + + // ----- helper methods -------------------------------------------------- + + /** + * Check the message version against the expected version. + * + * @param notificationMessage the notification message + * @param messageJson the notification message json + * + * @throws IncompatibleVersionException if the message version is incompatable with the expected version + */ + protected void checkVersion(AtlasNotificationBaseMessage notificationMessage, String messageJson) { + int comp = notificationMessage.compareVersion(expectedVersion); + + // message has newer version + if (comp > 0) { + String msg = String.format(VERSION_MISMATCH_MSG, expectedVersion, notificationMessage.getVersion(), messageJson); + + notificationLogger.error(msg); + + throw new IncompatibleVersionException(msg); + } + + // message has older version + if (comp < 0) { + notificationLogger.info(String.format(VERSION_MISMATCH_MSG, expectedVersion, notificationMessage.getVersion(), messageJson)); + } + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java new file mode 100644 index 0000000..193735c --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + + +public class AtlasNotificationStringMessage extends AtlasNotificationBaseMessage { + private String message = null; + + public AtlasNotificationStringMessage() { + super(AbstractNotification.CURRENT_MESSAGE_VERSION); + } + + public AtlasNotificationStringMessage(String message) { + super(AbstractNotification.CURRENT_MESSAGE_VERSION); + + this.message = message; + } + + public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind) { + super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind); + + this.message = message; + } + + public AtlasNotificationStringMessage(byte[] encodedBytes, String msgId, CompressionKind compressionKind) { + super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind); + + this.message = AtlasNotificationBaseMessage.getStringUtf8(encodedBytes); + } + + public AtlasNotificationStringMessage(byte[] encodedBytes, int offset, int length, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) { + super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount); + + this.message = new String(encodedBytes, offset, length); + } + + public void setMessage(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java index 6ef407a..7f96638 100644 --- a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java +++ b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java @@ -29,6 +29,9 @@ public class MessageVersion implements Comparable<MessageVersion> { * Used for message with no version (old format). */ public static final MessageVersion NO_VERSION = new MessageVersion("0"); + public static final MessageVersion VERSION_1 = new MessageVersion("1.0.0"); + + public static final MessageVersion CURRENT_VERSION = VERSION_1; private final String version; http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java index 956c85e..a787862 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java @@ -54,9 +54,9 @@ public interface NotificationInterface { * Versioned notification message class types. */ Type HOOK_VERSIONED_MESSAGE_TYPE = - new TypeToken<VersionedMessage<HookNotification.HookNotificationMessage>>(){}.getType(); + new TypeToken<AtlasNotificationMessage<HookNotification.HookNotificationMessage>>(){}.getType(); - Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<VersionedMessage<EntityNotification>>(){}.getType(); + Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<AtlasNotificationMessage<EntityNotification>>(){}.getType(); /** * Atlas notification types. http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java b/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java deleted file mode 100644 index 1929eb4..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/VersionedMessage.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.notification; - -/** - * Represents a notification message that is associated with a version. - */ -public class VersionedMessage<T> { - - /** - * The version of the message. - */ - private final MessageVersion version; - - /** - * The actual message. - */ - private final T message; - - - // ----- Constructors ---------------------------------------------------- - - /** - * Create a versioned message. - * - * @param version the message version - * @param message the actual message - */ - public VersionedMessage(MessageVersion version, T message) { - this.version = version; - this.message = message; - } - - - // ----- VersionedMessage ------------------------------------------------ - - /** - * Compare the version of this message with the given version. - * - * @param compareToVersion the version to compare to - * - * @return a negative integer, zero, or a positive integer as this message's version is less than, equal to, - * or greater than the given version. - */ - public int compareVersion(MessageVersion compareToVersion) { - return version.compareTo(compareToVersion); - } - - - // ----- accessors ------------------------------------------------------- - - public MessageVersion getVersion() { - return version; - } - - public T getMessage() { - return message; - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java deleted file mode 100644 index cc2099e..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.notification; - -import com.google.gson.Gson; -import org.slf4j.Logger; - -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; - -/** - * Deserializer that works with versioned messages. The version of each deserialized message is checked against an - * expected version. - */ -public abstract class VersionedMessageDeserializer<T> implements MessageDeserializer<T> { - - public static final String VERSION_MISMATCH_MSG = - "Notification message version mismatch. Expected %s but recieved %s. Message %s"; - - private final Type versionedMessageType; - private final MessageVersion expectedVersion; - private final Logger notificationLogger; - private final Gson gson; - - - // ----- Constructors ---------------------------------------------------- - - /** - * Create a versioned message deserializer. - * - * @param versionedMessageType the type of the versioned message - * @param expectedVersion the expected message version - * @param gson JSON serialization/deserialization - * @param notificationLogger logger for message version mismatch - */ - public VersionedMessageDeserializer(Type versionedMessageType, MessageVersion expectedVersion, - Gson gson, Logger notificationLogger) { - this.versionedMessageType = versionedMessageType; - this.expectedVersion = expectedVersion; - this.gson = gson; - this.notificationLogger = notificationLogger; - } - - - // ----- MessageDeserializer --------------------------------------------- - - @Override - public T deserialize(String messageJson) { - VersionedMessage<T> versionedMessage = gson.fromJson(messageJson, versionedMessageType); - - // older style messages not wrapped with VersionedMessage - if (versionedMessage.getVersion() == null) { - Type t = ((ParameterizedType) versionedMessageType).getActualTypeArguments()[0]; - versionedMessage = new VersionedMessage<>(MessageVersion.NO_VERSION, gson.<T>fromJson(messageJson, t)); - } - checkVersion(versionedMessage, messageJson); - - return versionedMessage.getMessage(); - } - - - // ----- helper methods -------------------------------------------------- - - /** - * Check the message version against the expected version. - * - * @param versionedMessage the versioned message - * @param messageJson the notification message json - * - * @throws IncompatibleVersionException if the message version is incompatable with the expected version - */ - protected void checkVersion(VersionedMessage<T> versionedMessage, String messageJson) { - int comp = versionedMessage.compareVersion(expectedVersion); - - // message has newer version - if (comp > 0) { - String msg = - String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion(), messageJson); - notificationLogger.error(msg); - throw new IncompatibleVersionException(msg); - } - - // message has older version - if (comp < 0) { - notificationLogger.info(String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion(), - messageJson)); - } - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java index 9b712f4..08a20bd 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java @@ -19,11 +19,8 @@ package org.apache.atlas.kafka; import kafka.message.MessageAndMetadata; -import org.apache.atlas.notification.AbstractNotification; -import org.apache.atlas.notification.MessageVersion; -import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.notification.IncompatibleVersionException; -import org.apache.atlas.notification.VersionedMessage; +import org.apache.atlas.notification.*; +import org.apache.atlas.notification.AtlasNotificationMessage; import org.apache.atlas.notification.entity.EntityNotificationImplTest; import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.IStruct; @@ -82,7 +79,7 @@ public class KafkaConsumerTest { HookNotification.EntityUpdateRequest message = new HookNotification.EntityUpdateRequest("user1", entity); - String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message)); + String json = AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message)); kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0))); List<ConsumerRecord> klist = new ArrayList<>(); @@ -119,7 +116,7 @@ public class KafkaConsumerTest { HookNotification.EntityUpdateRequest message = new HookNotification.EntityUpdateRequest("user1", entity); - String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), message)); + String json = AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message)); kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0))); List<ConsumerRecord> klist = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java index b7474a0..09e2e43 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java @@ -17,20 +17,14 @@ */ package org.apache.atlas.kafka; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.serializer.StringDecoder; -import org.apache.atlas.notification.MessageDeserializer; import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationInterface; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.testng.annotations.Test; -import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -38,11 +32,9 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import org.apache.atlas.kafka.AtlasKafkaConsumer; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; +import scala.actors.threadpool.Arrays; + import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -90,7 +82,7 @@ public class KafkaNotificationMockTest { when(producer.send(expectedRecord)).thenReturn(returnValue); kafkaNotification.sendInternalToProducer(producer, - NotificationInterface.NotificationType.HOOK, new String[]{message}); + NotificationInterface.NotificationType.HOOK, Arrays.asList(new String[]{message})); verify(producer).send(expectedRecord); } @@ -112,7 +104,7 @@ public class KafkaNotificationMockTest { try { kafkaNotification.sendInternalToProducer(producer, - NotificationInterface.NotificationType.HOOK, new String[]{message}); + NotificationInterface.NotificationType.HOOK, Arrays.asList(new String[]{message})); fail("Should have thrown NotificationException"); } catch (NotificationException e) { assertEquals(e.getFailedMessages().size(), 1); @@ -142,7 +134,7 @@ public class KafkaNotificationMockTest { try { kafkaNotification.sendInternalToProducer(producer, - NotificationInterface.NotificationType.HOOK, new String[]{message1, message2}); + NotificationInterface.NotificationType.HOOK, Arrays.asList(new String[]{message1, message2})); fail("Should have thrown NotificationException"); } catch (NotificationException e) { assertEquals(e.getFailedMessages().size(), 2); http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java index 3b2a093..12f48d1 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java @@ -30,7 +30,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Objects; -import static org.mockito.Matchers.endsWith; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; @@ -57,15 +56,15 @@ public class AbstractNotificationConsumerTest { List jsonList = new LinkedList<>(); - jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1))); - jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2))); - jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3))); - jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4))); + jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1))); + jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage2))); + jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage3))); + jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage4))); - Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType(); + Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType(); NotificationConsumer<TestMessage> consumer = - new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); + new TestNotificationConsumer<>(notificationMessageType, jsonList, logger); List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(); @@ -91,9 +90,9 @@ public class AbstractNotificationConsumerTest { List jsonList = new LinkedList<>(); - String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)); - String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.0.5"), testMessage2)); - String json3 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.5.0"), testMessage3)); + String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1)); + String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.0.5"), testMessage2)); + String json3 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.5.0"), testMessage3)); String json4 = GSON.toJson(testMessage4); jsonList.add(json1); @@ -101,10 +100,10 @@ public class AbstractNotificationConsumerTest { jsonList.add(json3); jsonList.add(json4); - Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType(); + Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType(); NotificationConsumer<TestMessage> consumer = - new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); + new TestNotificationConsumer<>(notificationMessageType, jsonList, logger); List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(); @@ -127,16 +126,16 @@ public class AbstractNotificationConsumerTest { List jsonList = new LinkedList<>(); - String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)); - String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), testMessage2)); + String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1)); + String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), testMessage2)); jsonList.add(json1); jsonList.add(json2); - Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType(); + Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType(); NotificationConsumer<TestMessage> consumer = - new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); + new TestNotificationConsumer<>(notificationMessageType, jsonList, logger); try { List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(); @@ -187,8 +186,8 @@ public class AbstractNotificationConsumerTest { private final List<T> messageList; private int index = 0; - public TestNotificationConsumer(Type versionedMessageType, List<T> messages, Logger logger) { - super(new TestDeserializer<T>(versionedMessageType, logger)); + public TestNotificationConsumer(Type notificationMessageType, List<T> messages, Logger logger) { + super(new TestDeserializer<T>(notificationMessageType, logger)); this.messageList = messages; } @@ -222,10 +221,10 @@ public class AbstractNotificationConsumerTest { } } - private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> { + private static final class TestDeserializer<T> extends AtlasNotificationMessageDeserializer<T> { - private TestDeserializer(Type versionedMessageType, Logger logger) { - super(versionedMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger); + private TestDeserializer(Type notificationMessageType, Logger logger) { + super(notificationMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger); } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java index 61107a9..4719324 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java @@ -23,6 +23,7 @@ import org.apache.atlas.notification.hook.HookNotification; import org.apache.commons.configuration.Configuration; import org.testng.annotations.Test; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -44,17 +45,18 @@ public class AbstractNotificationTest { TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1"); TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1"); - String messageJson1 = AbstractNotification.getMessageJson(message1); - String messageJson2 = AbstractNotification.getMessageJson(message2); - String messageJson3 = AbstractNotification.getMessageJson(message3); + List<String> messageJson = new ArrayList<>(); + AbstractNotification.createNotificationMessages(message1, messageJson); + AbstractNotification.createNotificationMessages(message2, messageJson); + AbstractNotification.createNotificationMessages(message3, messageJson); notification.send(NotificationInterface.NotificationType.HOOK, message1, message2, message3); assertEquals(NotificationInterface.NotificationType.HOOK, notification.type); - assertEquals(3, notification.messages.length); - assertEquals(messageJson1, notification.messages[0]); - assertEquals(messageJson2, notification.messages[1]); - assertEquals(messageJson3, notification.messages[2]); + assertEquals(3, notification.messages.size()); + assertEquals(messageJson.get(0), notification.messages.get(0)); + assertEquals(messageJson.get(1), notification.messages.get(1)); + assertEquals(messageJson.get(2), notification.messages.get(2)); } @Test @@ -72,17 +74,16 @@ public class AbstractNotificationTest { messages.add(message2); messages.add(message3); - String messageJson1 = AbstractNotification.getMessageJson(message1); - String messageJson2 = AbstractNotification.getMessageJson(message2); - String messageJson3 = AbstractNotification.getMessageJson(message3); + List<String> messageJson = new ArrayList<>(); + AbstractNotification.createNotificationMessages(message1, messageJson); + AbstractNotification.createNotificationMessages(message2, messageJson); + AbstractNotification.createNotificationMessages(message3, messageJson); notification.send(NotificationInterface.NotificationType.HOOK, messages); assertEquals(NotificationInterface.NotificationType.HOOK, notification.type); - assertEquals(3, notification.messages.length); - assertEquals(messageJson1, notification.messages[0]); - assertEquals(messageJson2, notification.messages[1]); - assertEquals(messageJson3, notification.messages[2]); + assertEquals(messageJson.size(), notification.messages.size()); + assertEquals(messageJson, notification.messages); } public static class TestMessage extends HookNotification.HookNotificationMessage { @@ -94,14 +95,14 @@ public class AbstractNotificationTest { public static class TestNotification extends AbstractNotification { private NotificationType type; - private String[] messages; + private List<String> messages; public TestNotification(Configuration applicationProperties) throws AtlasException { super(applicationProperties); } @Override - protected void sendInternal(NotificationType notificationType, String[] notificationMessages) + protected void sendInternal(NotificationType notificationType, List<String> notificationMessages) throws NotificationException { type = notificationType; http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java new file mode 100644 index 0000000..27b5034 --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + +/** + * AtlasNotificationMessage tests. + */ +public class AtlasNotificationMessageTest { + + @Test + public void testGetVersion() throws Exception { + MessageVersion version = new MessageVersion("1.0.0"); + AtlasNotificationMessage<String> atlasNotificationMessage = new AtlasNotificationMessage<>(version, "a"); + assertEquals(atlasNotificationMessage.getVersion(), version); + } + + @Test + public void testGetMessage() throws Exception { + String message = "a"; + MessageVersion version = new MessageVersion("1.0.0"); + AtlasNotificationMessage<String> atlasNotificationMessage = new AtlasNotificationMessage<>(version, message); + assertEquals(atlasNotificationMessage.getMessage(), message); + } + + @Test + public void testCompareVersion() throws Exception { + MessageVersion version1 = new MessageVersion("1.0.0"); + MessageVersion version2 = new MessageVersion("2.0.0"); + MessageVersion version3 = new MessageVersion("0.5.0"); + + AtlasNotificationMessage<String> atlasNotificationMessage = new AtlasNotificationMessage<>(version1, "a"); + + assertTrue(atlasNotificationMessage.compareVersion(version1) == 0); + assertTrue(atlasNotificationMessage.compareVersion(version2) < 0); + assertTrue(atlasNotificationMessage.compareVersion(version3) > 0); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java deleted file mode 100644 index 587b7eb..0000000 --- a/notification/src/test/java/org/apache/atlas/notification/VersionedMessageTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.notification; - -import org.testng.annotations.Test; - -import static org.testng.Assert.*; - -/** - * VersionedMessage tests. - */ -public class VersionedMessageTest { - - @Test - public void testGetVersion() throws Exception { - MessageVersion version = new MessageVersion("1.0.0"); - VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, "a"); - assertEquals(versionedMessage.getVersion(), version); - } - - @Test - public void testGetMessage() throws Exception { - String message = "a"; - MessageVersion version = new MessageVersion("1.0.0"); - VersionedMessage<String> versionedMessage = new VersionedMessage<>(version, message); - assertEquals(versionedMessage.getMessage(), message); - } - - @Test - public void testCompareVersion() throws Exception { - MessageVersion version1 = new MessageVersion("1.0.0"); - MessageVersion version2 = new MessageVersion("2.0.0"); - MessageVersion version3 = new MessageVersion("0.5.0"); - - VersionedMessage<String> versionedMessage = new VersionedMessage<>(version1, "a"); - - assertTrue(versionedMessage.compareVersion(version1) == 0); - assertTrue(versionedMessage.compareVersion(version2) < 0); - assertTrue(versionedMessage.compareVersion(version3) > 0); - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java index be32427..7b513da 100644 --- a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java @@ -24,6 +24,7 @@ import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; import org.testng.annotations.Test; +import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -48,9 +49,20 @@ public class EntityMessageDeserializerTest { EntityNotificationImpl notification = new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo); - String json = AbstractNotification.getMessageJson(notification); + List<String> jsonMsgList = new ArrayList<>(); + + AbstractNotification.createNotificationMessages(notification, jsonMsgList); + + EntityNotification deserializedNotification = null; + + for (String jsonMsg : jsonMsgList) { + deserializedNotification = deserializer.deserialize(jsonMsg); + + if (deserializedNotification != null) { + break; + } + } - EntityNotification deserializedNotification = deserializer.deserialize(json); assertEquals(deserializedNotification.getOperationType(), notification.getOperationType()); assertEquals(deserializedNotification.getEntity().getId(), notification.getEntity().getId()); assertEquals(deserializedNotification.getEntity().getTypeName(), notification.getEntity().getTypeName()); http://git-wip-us.apache.org/repos/asf/atlas/blob/d541a378/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java index 3724fd5..49b877b 100644 --- a/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java @@ -20,51 +20,151 @@ package org.apache.atlas.notification.hook; import org.apache.atlas.notification.AbstractNotification; import org.apache.atlas.notification.entity.EntityNotificationImplTest; -import org.apache.atlas.typesystem.IStruct; +import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; +import org.apache.commons.lang3.RandomStringUtils; import org.testng.annotations.Test; +import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedList; import java.util.List; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; /** * HookMessageDeserializer tests. */ public class HookMessageDeserializerTest { + HookMessageDeserializer deserializer = new HookMessageDeserializer(); + @Test public void testDeserialize() throws Exception { - HookMessageDeserializer deserializer = new HookMessageDeserializer(); + Referenceable entity = generateEntityWithTrait(); + EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + + List<String> jsonMsgList = new ArrayList<>(); + + AbstractNotification.createNotificationMessages(message, jsonMsgList); + + HookNotificationMessage deserializedMessage = deserialize(jsonMsgList); + + assertEqualMessage(deserializedMessage, message); + } + + // validate deserialization of legacy message, which doesn't use MessageVersion + @Test + public void testDeserializeLegacyMessage() throws Exception { + Referenceable entity = generateEntityWithTrait(); + EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + + String jsonMsg = AbstractNotification.GSON.toJson(message); + HookNotificationMessage deserializedMessage = deserializer.deserialize(jsonMsg); + + assertEqualMessage(deserializedMessage, message); + } + + @Test + public void testDeserializeCompressedMessage() throws Exception { + Referenceable entity = generateLargeEntityWithTrait(); + EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + + List<String> jsonMsgList = new ArrayList<>(); + + AbstractNotification.createNotificationMessages(message, jsonMsgList); + + assertTrue(jsonMsgList.size() == 1); + + String compressedMsg = jsonMsgList.get(0); + String uncompressedMsg = AbstractNotification.GSON.toJson(message); + + assertTrue(compressedMsg.length() < uncompressedMsg.length(), "Compressed message (" + compressedMsg.length() + ") should be shorter than uncompressed message (" + uncompressedMsg.length() + ")"); + + HookNotificationMessage deserializedMessage = deserialize(jsonMsgList); + + assertEqualMessage(deserializedMessage, message); + } - Referenceable entity = EntityNotificationImplTest.getEntity("id"); - String traitName = "MyTrait"; - List<IStruct> traitInfo = new LinkedList<>(); - IStruct trait = new Struct(traitName, Collections.<String, Object>emptyMap()); - traitInfo.add(trait); + @Test + public void testDeserializeSplitMessage() throws Exception { + Referenceable entity = generateVeryLargeEntityWithTrait(); + EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + + List<String> jsonMsgList = new ArrayList<>(); + + AbstractNotification.createNotificationMessages(message, jsonMsgList); - HookNotification.EntityUpdateRequest message = - new HookNotification.EntityUpdateRequest("user1", entity); + assertTrue(jsonMsgList.size() > 1); - String json = AbstractNotification.getMessageJson(message); + HookNotificationMessage deserializedMessage = deserialize(jsonMsgList); - HookNotification.HookNotificationMessage deserializedMessage = deserializer.deserialize(json); + assertEqualMessage(deserializedMessage, message); + } + + private Referenceable generateEntityWithTrait() { + Referenceable ret = EntityNotificationImplTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap())); + return ret; + } + + private HookNotificationMessage deserialize(List<String> jsonMsgList) { + HookNotificationMessage deserializedMessage = null; + + for (String jsonMsg : jsonMsgList) { + deserializedMessage = deserializer.deserialize(jsonMsg); + + if (deserializedMessage != null) { + break; + } + } + + return deserializedMessage; + } + + private void assertEqualMessage(HookNotificationMessage deserializedMessage, EntityUpdateRequest message) throws Exception { + assertNotNull(deserializedMessage); assertEquals(deserializedMessage.getType(), message.getType()); assertEquals(deserializedMessage.getUser(), message.getUser()); - assertTrue(deserializedMessage instanceof HookNotification.EntityUpdateRequest); + assertTrue(deserializedMessage instanceof EntityUpdateRequest); - HookNotification.EntityUpdateRequest deserializedEntityUpdateRequest = - (HookNotification.EntityUpdateRequest) deserializedMessage; + EntityUpdateRequest deserializedEntityUpdateRequest = (EntityUpdateRequest) deserializedMessage; + Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0); + Referenceable entity = message.getEntities().get(0); + String traitName = entity.getTraits().get(0); - Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0); assertEquals(deserializedEntity.getId(), entity.getId()); assertEquals(deserializedEntity.getTypeName(), entity.getTypeName()); assertEquals(deserializedEntity.getTraits(), entity.getTraits()); - assertEquals(deserializedEntity.getTrait(traitName), entity.getTrait(traitName)); + assertEquals(deserializedEntity.getTrait(traitName).hashCode(), entity.getTrait(traitName).hashCode()); + + } + + private Referenceable generateLargeEntityWithTrait() { + Referenceable ret = EntityNotificationImplTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap())); + + // add 100 attributes, each with value of size 10k + // Json Size=1,027,984; GZipped Size=16,387 ==> will compress, but not split + String attrValue = RandomStringUtils.randomAlphanumeric(10 * 1024); // use the same value for all attributes - to aid better compression + for (int i = 0; i < 100; i++) { + ret.set("attr_" + i, attrValue); + } + + return ret; + } + + private Referenceable generateVeryLargeEntityWithTrait() { + Referenceable ret = EntityNotificationImplTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap())); + + // add 300 attributes, each with value of size 10k + // Json Size=3,082,384; GZipped Size=2,313,357 ==> will compress & split + for (int i = 0; i < 300; i++) { + ret.set("attr_" + i, RandomStringUtils.randomAlphanumeric(10 * 1024)); + } + + return ret; } }
