Repository: atlas Updated Branches: refs/heads/branch-0.8 99243ee8e -> 28941bfe7
ATLAS-2075: notification enhancement to handle large messages, using compression and multi-part messages (#2) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/28941bfe Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/28941bfe Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/28941bfe Branch: refs/heads/branch-0.8 Commit: 28941bfe7220fc0cf09d138e79e18d5e0a82c367 Parents: 99243ee Author: Madhan Neethiraj <[email protected]> Authored: Sat Sep 30 13:33:30 2017 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Sat Sep 30 14:46:48 2017 -0700 ---------------------------------------------------------------------- .../atlas/notification/AbstractNotification.java | 9 ++++----- .../AtlasNotificationMessageDeserializer.java | 14 +++++++------- 2 files changed, 11 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/28941bfe/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java index 1f9404d..6a70734 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java @@ -190,16 +190,13 @@ public abstract class AbstractNotification implements NotificationInterface { if (msgLengthExceedsLimit) { // compressed messages are already base64-encoded - byte[] encodedBytes = compressionKind != CompressionKind.NONE ? msgBytes : AtlasNotificationBaseMessage.encodeBase64(msgBytes); - - int splitCount = encodedBytes.length / MESSAGE_MAX_LENGTH_BYTES; + byte[] encodedBytes = MESSAGE_COMPRESSION_ENABLED ? msgBytes : AtlasNotificationBaseMessage.encodeBase64(msgBytes); + int splitCount = encodedBytes.length / MESSAGE_MAX_LENGTH_BYTES; if ((encodedBytes.length % MESSAGE_MAX_LENGTH_BYTES) != 0) { splitCount++; } - LOG.info("Splitting large message: msgID={}, length={} bytes, splitCount={}", msgId, encodedBytes.length, splitCount); - for (int i = 0, offset = 0; i < splitCount; i++) { int length = MESSAGE_MAX_LENGTH_BYTES; @@ -215,6 +212,8 @@ public abstract class AbstractNotification implements NotificationInterface { offset += length; } + + LOG.info("Split large message: msgID={}, splitCount={}, length={} bytes", msgId, splitCount, encodedBytes.length); } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/28941bfe/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java index b1ac2fa..3d80284 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java +++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java @@ -107,15 +107,15 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message } if (splitMsgs == null) { - LOG.error("Received multi-part message: msgID={}, {} of {}, but first message didn't arrive. Ignoring message", msgId, splitIdx + 1, splitCount); + LOG.error("Received msgID={}: {} of {}, but first message didn't arrive. Ignoring message", msgId, splitIdx + 1, splitCount); msg = null; } else if (splitMsgs.length <= splitIdx) { - LOG.error("Received multi-part message: msgID={}, {} of {} - out of bounds. Ignoring message", msgId, splitIdx + 1, splitCount); + LOG.error("Received msgID={}: {} of {} - out of bounds. Ignoring message", msgId, splitIdx + 1, splitCount); msg = null; } else { - LOG.info("Received multi-part message: msgID={}, {} of {}", msgId, splitIdx + 1, splitCount); + LOG.info("Received msgID={}: {} of {}", msgId, splitIdx + 1, splitCount); splitMsgs[splitIdx] = splitMsg; @@ -130,7 +130,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message splitMsg = splitMsgs[i]; if (splitMsg == null) { - LOG.warn("Multi-part message: msgID={}, message {} of {} is missing. Ignoring message", msgId, i + 1, splitCount); + LOG.warn("MsgID={}: message {} of {} is missing. Ignoring message", msgId, i + 1, splitCount); isValidMessage = false; @@ -149,14 +149,14 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes); - LOG.info("Received multi-part, compressed message: msgID={}, compressed={} bytes, uncompressed={} bytes", msgId, encodedBytes.length, bytes.length); + LOG.info("Received msgID={}: splitCount={}, compressed={} bytes, uncompressed={} bytes", msgId, splitCount, encodedBytes.length, bytes.length); } else { byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson); byte[] bytes = AtlasNotificationBaseMessage.decodeBase64(encodedBytes); msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes); - LOG.info("Received multi-part message: msgID={}, compressed={} bytes, uncompressed={} bytes", msgId, encodedBytes.length, bytes.length); + LOG.info("Received msgID={}: splitCount={}, length={} bytes", msgId, splitCount, bytes.length); } msg = gson.fromJson(msgJson, AtlasNotificationBaseMessage.class); @@ -179,7 +179,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes); - LOG.info("Received compressed message: msgID={}, compressed={} bytes, uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, bytes.length); + LOG.info("Received msgID={}: compressed={} bytes, uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, bytes.length); } AtlasNotificationMessage<T> atlasNotificationMessage = gson.fromJson(msgJson, notificationMessageType);
