Repository: atlas Updated Branches: refs/heads/branch-0.8 2ee8f3372 -> 9e33e5b0a
ATLAS-2192: notification consumer updates to handle stale split messages Signed-off-by: Madhan Neethiraj <[email protected]> (cherry picked from commit 9a8c71254c70a0e70241c8b9c8892603ecb03e7c) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/9e33e5b0 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/9e33e5b0 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/9e33e5b0 Branch: refs/heads/branch-0.8 Commit: 9e33e5b0ae8eb314d13e7b217e69aa665de11be9 Parents: 2ee8f33 Author: ashutoshm <[email protected]> Authored: Fri Oct 6 15:44:09 2017 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Fri Oct 6 17:53:51 2017 -0700 ---------------------------------------------------------------------- .../org/apache/atlas/AtlasConfiguration.java | 2 + .../java/org/apache/atlas/hook/AtlasHook.java | 1 + .../notification/AbstractNotification.java | 39 ++++++- .../notification/AtlasNotificationMessage.java | 39 ++++++- .../AtlasNotificationMessageDeserializer.java | 101 ++++++++++++++++--- .../AtlasNotificationStringMessage.java | 6 ++ .../notification/NotificationInterface.java | 6 ++ .../notification/SplitMessageAggregator.java | 69 +++++++++++++ .../AbstractNotificationConsumerTest.java | 2 - .../notification/AbstractNotificationTest.java | 26 +++-- .../SplitMessageAggregatorTest.java | 77 ++++++++++++++ 11 files changed, 342 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/9e33e5b0/common/src/main/java/org/apache/atlas/AtlasConfiguration.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java index 451bd9d..bd2bf7f 100644 --- a/common/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/common/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -35,6 +35,8 @@ public enum AtlasConfiguration { NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)), NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true), + NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60), + NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS("atlas.notification.split.message.buffer.purge.interval.seconds", 5 * 60), //search configuration SEARCH_MAX_LIMIT("atlas.search.maxlimit", 10000), http://git-wip-us.apache.org/repos/asf/atlas/blob/9e33e5b0/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java index a8609e6..4829221 100644 --- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -85,6 +85,7 @@ public abstract class AtlasHook { notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000); notificationInterface = NotificationProvider.get(); + notificationInterface.setCurrentUser(getUser()); LOG.info("Created Atlas Hook"); } http://git-wip-us.apache.org/repos/asf/atlas/blob/9e33e5b0/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java index 6a70734..4f56bd8 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java @@ -31,12 +31,14 @@ import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; import org.codehaus.jettison.json.JSONArray; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.lang.reflect.Type; +import java.net.Inet4Address; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -64,6 +66,16 @@ public abstract class AbstractNotification implements NotificationInterface { public static final int MAX_BYTES_PER_CHAR = 4; // each char can encode upto 4 bytes in UTF-8 + /** + * IP address of the host in which this process has started + */ + private static String localHostAddress = ""; + + /** + * + */ + private static String currentUser = ""; + private final boolean embedded; private final boolean isHAEnabled; @@ -107,6 +119,11 @@ public abstract class AbstractNotification implements NotificationInterface { send(type, Arrays.asList(messages)); } + @Override + public void setCurrentUser(String user) { + currentUser = user; + } + // ----- AbstractNotification -------------------------------------------- /** @@ -146,6 +163,24 @@ public abstract class AbstractNotification implements NotificationInterface { return GSON.toJson(notificationMsg); } + private static String getHostAddress() { + if (StringUtils.isEmpty(localHostAddress)) { + try { + localHostAddress = Inet4Address.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + LOG.warn("failed to get local host address", e); + + localHostAddress = ""; + } + } + + return localHostAddress; + } + + private static String getCurrentUser() { + return currentUser; + } + /** * Get the notification message JSON from the given object. * @@ -154,7 +189,7 @@ public abstract class AbstractNotification implements NotificationInterface { * @return the message as a JSON string */ public static void createNotificationMessages(Object message, List<String> msgJsonList) { - AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message); + AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser()); String msgJson = GSON.toJson(notificationMsg); boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES; http://git-wip-us.apache.org/repos/asf/atlas/blob/9e33e5b0/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java index 2f6f9c7..63d93c9 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java +++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java @@ -18,10 +18,16 @@ package org.apache.atlas.notification; +import org.joda.time.DateTimeZone; +import org.joda.time.Instant; + /** * Represents a notification message that is associated with a version. */ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage { + private String msgSourceIP; + private String msgCreatedBy; + private long msgCreationTime; /** * The actual message. @@ -38,11 +44,42 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage { * @param message the actual message */ public AtlasNotificationMessage(MessageVersion version, T message) { + this(version, message, null, null); + } + + public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) { super(version); - this.message = message; + this.msgSourceIP = msgSourceIP; + this.msgCreatedBy = createdBy; + this.msgCreationTime = Instant.now().toDateTime(DateTimeZone.UTC).getMillis(); + this.message = message; + } + + + public String getMsgSourceIP() { + return msgSourceIP; + } + + public void setMsgSourceIP(String msgSourceIP) { + this.msgSourceIP = msgSourceIP; } + public String getMsgCreatedBy() { + return msgCreatedBy; + } + + public void setMsgCreatedBy(String msgCreatedBy) { + this.msgCreatedBy = msgCreatedBy; + } + + public long getMsgCreationTime() { + return msgCreationTime; + } + + public void setMsgCreationTime(long msgCreationTime) { + this.msgCreationTime = msgCreationTime; + } public T getMessage() { return message; http://git-wip-us.apache.org/repos/asf/atlas/blob/9e33e5b0/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java index 3d80284..2a175ba 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java +++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java @@ -18,6 +18,7 @@ package org.apache.atlas.notification; +import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind; import org.apache.commons.lang3.StringUtils; @@ -26,8 +27,14 @@ import org.slf4j.LoggerFactory; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS; +import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS; /** * Deserializer that works with notification messages. The version of each deserialized message is checked against an @@ -47,8 +54,12 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message private final Gson gson; - private final Map<String, AtlasNotificationStringMessage[]> splitMsgBuffer = new HashMap<>(); - + private final Map<String, SplitMessageAggregator> splitMsgBuffer = new HashMap<>(); + private final long splitMessageBufferPurgeIntervalMs; + private final long splitMessageSegmentsWaitTimeMs; + private long splitMessagesLastPurgeTime = System.currentTimeMillis(); + private final AtomicLong messageCountTotal = new AtomicLong(0); + private final AtomicLong messageCountSinceLastInterval = new AtomicLong(0); // ----- Constructors ---------------------------------------------------- /** @@ -61,11 +72,22 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message */ public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion, Gson gson, Logger notificationLogger) { - this.notificationMessageType = notificationMessageType; - this.messageType = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0]; - this.expectedVersion = expectedVersion; - this.gson = gson; - this.notificationLogger = notificationLogger; + this(notificationMessageType, expectedVersion, gson, notificationLogger, + NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS.getLong() * 1000, + NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS.getLong() * 1000); + } + + public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion, + Gson gson, Logger notificationLogger, + long splitMessageSegmentsWaitTimeMs, + long splitMessageBufferPurgeIntervalMs) { + this.notificationMessageType = notificationMessageType; + this.messageType = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0]; + this.expectedVersion = expectedVersion; + this.gson = gson; + this.notificationLogger = notificationLogger; + this.splitMessageSegmentsWaitTimeMs = splitMessageSegmentsWaitTimeMs; + this.splitMessageBufferPurgeIntervalMs = splitMessageBufferPurgeIntervalMs; } // ----- MessageDeserializer --------------------------------------------- @@ -74,6 +96,9 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message public T deserialize(String messageJson) { final T ret; + messageCountTotal.incrementAndGet(); + messageCountSinceLastInterval.incrementAndGet(); + AtlasNotificationBaseMessage msg = gson.fromJson(messageJson, AtlasNotificationBaseMessage.class); if (msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage @@ -96,12 +121,12 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message final int splitIdx = splitMsg.getMsgSplitIdx(); final int splitCount = splitMsg.getMsgSplitCount(); - final AtlasNotificationStringMessage[] splitMsgs; + final SplitMessageAggregator splitMsgs; if (splitIdx == 0) { - splitMsgs = new AtlasNotificationStringMessage[splitCount]; + splitMsgs = new SplitMessageAggregator(splitMsg); - splitMsgBuffer.put(msgId, splitMsgs); + splitMsgBuffer.put(splitMsgs.getMsgId(), splitMsgs); } else { splitMsgs = splitMsgBuffer.get(msgId); } @@ -110,24 +135,24 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message LOG.error("Received msgID={}: {} of {}, but first message didn't arrive. Ignoring message", msgId, splitIdx + 1, splitCount); msg = null; - } else if (splitMsgs.length <= splitIdx) { + } else if (splitMsgs.getTotalSplitCount() <= splitIdx) { LOG.error("Received msgID={}: {} of {} - out of bounds. Ignoring message", msgId, splitIdx + 1, splitCount); msg = null; } else { LOG.info("Received msgID={}: {} of {}", msgId, splitIdx + 1, splitCount); - splitMsgs[splitIdx] = splitMsg; + boolean isReady = splitMsgs.add(splitMsg); - if (splitIdx == (splitCount - 1)) { // last message + if (isReady) { // last message splitMsgBuffer.remove(msgId); boolean isValidMessage = true; StringBuilder sb = new StringBuilder(); - for (int i = 0; i < splitMsgs.length; i++) { - splitMsg = splitMsgs[i]; + for (int i = 0; i < splitMsgs.getTotalSplitCount(); i++) { + splitMsg = splitMsgs.get(i); if (splitMsg == null) { LOG.warn("MsgID={}: message {} of {} is missing. Ignoring message", msgId, i + 1, splitCount); @@ -192,9 +217,55 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message } } + + long now = System.currentTimeMillis(); + long timeSinceLastPurge = now - splitMessagesLastPurgeTime; + + if(timeSinceLastPurge >= splitMessageBufferPurgeIntervalMs) { + purgeStaleMessages(splitMsgBuffer, now, splitMessageSegmentsWaitTimeMs); + + LOG.info("Notification processing stats: total={}, sinceLastStatsReport={}", messageCountTotal.get(), messageCountSinceLastInterval.getAndSet(0)); + + splitMessagesLastPurgeTime = now; + } + return ret; } + @VisibleForTesting + static void purgeStaleMessages(Map<String, SplitMessageAggregator> splitMsgBuffer, long now, long maxWaitTime) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> purgeStaleMessages(bufferedMessageCount=" + splitMsgBuffer.size() + ")"); + } + + List<SplitMessageAggregator> evictionList = null; + + for (SplitMessageAggregator aggregrator : splitMsgBuffer.values()) { + long waitTime = now - aggregrator.getFirstSplitTimestamp(); + + if (waitTime < maxWaitTime) { + continue; + } + + if(evictionList == null) { + evictionList = new ArrayList<>(); + } + + evictionList.add(aggregrator); + } + + if(evictionList != null) { + for (SplitMessageAggregator aggregrator : evictionList) { + LOG.error("evicting notification msgID={}, totalSplitCount={}, receivedSplitCount={}", aggregrator.getMsgId(), aggregrator.getTotalSplitCount(), aggregrator.getReceivedSplitCount()); + splitMsgBuffer.remove(aggregrator.getMsgId()); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== purgeStaleMessages(bufferedMessageCount=" + splitMsgBuffer.size() + ")"); + } + } + // ----- helper methods -------------------------------------------------- /** http://git-wip-us.apache.org/repos/asf/atlas/blob/9e33e5b0/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java index 193735c..41485a0 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java +++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java @@ -38,6 +38,12 @@ public class AtlasNotificationStringMessage extends AtlasNotificationBaseMessage this.message = message; } + public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) { + super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount); + + this.message = message; + } + public AtlasNotificationStringMessage(byte[] encodedBytes, String msgId, CompressionKind compressionKind) { super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind); http://git-wip-us.apache.org/repos/asf/atlas/blob/9e33e5b0/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java index a787862..8809225 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java @@ -99,6 +99,12 @@ public interface NotificationInterface { } /** + * + * @param user Name of the user under which the processes is running + */ + void setCurrentUser(String user); + + /** * Create notification consumers for the given notification type. * * @param notificationType the notification type (i.e. HOOK, ENTITIES) http://git-wip-us.apache.org/repos/asf/atlas/blob/9e33e5b0/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java new file mode 100644 index 0000000..148b57f --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification; + + +public class SplitMessageAggregator { + private final String msgId; + private final AtlasNotificationStringMessage[] splitMessagesBuffer; + private final long firstSplitTimestamp; + + public SplitMessageAggregator(AtlasNotificationStringMessage message) { + msgId = message.getMsgId(); + splitMessagesBuffer = new AtlasNotificationStringMessage[message.getMsgSplitCount()]; + firstSplitTimestamp = System.currentTimeMillis(); + + add(message); + } + + public String getMsgId() { + return msgId; + } + + public long getTotalSplitCount() { + return splitMessagesBuffer.length; + } + + public long getReceivedSplitCount() { + long ret = 0; + + for (AtlasNotificationStringMessage split : splitMessagesBuffer) { + if (split != null) { + ret++; + } + } + + return ret; + } + + public long getFirstSplitTimestamp() { + return firstSplitTimestamp; + } + + public boolean add(AtlasNotificationStringMessage message) { + if (message.getMsgSplitIdx() < splitMessagesBuffer.length) { + splitMessagesBuffer[message.getMsgSplitIdx()] = message; + } + + return message.getMsgSplitIdx() == (message.getMsgSplitCount() - 1); + } + + public AtlasNotificationStringMessage get(int i) { + return splitMessagesBuffer[i]; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/9e33e5b0/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java index 12f48d1..f313ddc 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java @@ -31,10 +31,8 @@ import java.util.List; import java.util.Objects; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import org.apache.kafka.common.TopicPartition; http://git-wip-us.apache.org/repos/asf/atlas/blob/9e33e5b0/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java index 4719324..655252c 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java @@ -26,6 +26,7 @@ import org.testng.annotations.Test; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Map; import static org.mockito.Mockito.mock; import static org.testng.Assert.*; @@ -54,9 +55,9 @@ public class AbstractNotificationTest { assertEquals(NotificationInterface.NotificationType.HOOK, notification.type); assertEquals(3, notification.messages.size()); - assertEquals(messageJson.get(0), notification.messages.get(0)); - assertEquals(messageJson.get(1), notification.messages.get(1)); - assertEquals(messageJson.get(2), notification.messages.get(2)); + for (int i = 0; i < notification.messages.size(); i++) { + assertEqualsMessageJson(notification.messages.get(i), messageJson.get(i)); + } } @Test @@ -81,9 +82,11 @@ public class AbstractNotificationTest { notification.send(NotificationInterface.NotificationType.HOOK, messages); - assertEquals(NotificationInterface.NotificationType.HOOK, notification.type); - assertEquals(messageJson.size(), notification.messages.size()); - assertEquals(messageJson, notification.messages); + assertEquals(notification.type, NotificationInterface.NotificationType.HOOK); + assertEquals(notification.messages.size(), messageJson.size()); + for (int i = 0; i < notification.messages.size(); i++) { + assertEqualsMessageJson(notification.messages.get(i), messageJson.get(i)); + } } public static class TestMessage extends HookNotification.HookNotificationMessage { @@ -93,6 +96,17 @@ public class AbstractNotificationTest { } } + // ignore msgCreationTime in Json + private void assertEqualsMessageJson(String msgJsonActual, String msgJsonExpected) { + Map<Object, Object> msgActual = AbstractNotification.GSON.fromJson(msgJsonActual, Map.class); + Map<Object, Object> msgExpected = AbstractNotification.GSON.fromJson(msgJsonExpected, Map.class); + + msgActual.remove("msgCreationTime"); + msgExpected.remove("msgCreationTime"); + + assertEquals(msgActual, msgExpected); + } + public static class TestNotification extends AbstractNotification { private NotificationType type; private List<String> messages; http://git-wip-us.apache.org/repos/asf/atlas/blob/9e33e5b0/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java b/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java new file mode 100644 index 0000000..0807221 --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification; + +import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; + +public class SplitMessageAggregatorTest { + @Test + public void verifyEviction() throws InterruptedException { + Map<String, SplitMessageAggregator> map = getStringSplitMessageAggregatorMap(); + + Thread.currentThread().sleep(500); + + AtlasNotificationMessageDeserializer.purgeStaleMessages(map, System.currentTimeMillis(), 250); + + Assert.assertEquals(map.size(), 0); + } + + + @Test + public void verifyEvictionDoesNotOccur() throws InterruptedException { + Map<String, SplitMessageAggregator> map = getStringSplitMessageAggregatorMap(); + + int expectedSize = map.size(); + + Thread.currentThread().sleep(500); + + AtlasNotificationMessageDeserializer.purgeStaleMessages(map, System.currentTimeMillis(), Long.MAX_VALUE); + + Assert.assertEquals(map.size(), expectedSize); + } + + private Map<String, SplitMessageAggregator> getStringSplitMessageAggregatorMap() { + Map<String, SplitMessageAggregator> map = new HashMap<>(); + + map.put("1", getSplitMessageAggregator("1", 5)); + map.put("2", getSplitMessageAggregator("2", 10)); + + return map; + } + + private SplitMessageAggregator getSplitMessageAggregator(String id, int splitCount) { + SplitMessageAggregator sma = null; + + for (int i = 0; i < splitCount; i++) { + AtlasNotificationStringMessage sm = new AtlasNotificationStringMessage("aaaaa", id, CompressionKind.NONE, i, splitCount); + + if(sma == null) { + sma = new SplitMessageAggregator(sm); + } else { + sma.add(sm); + } + } + + return sma; + } +}
