Repository: atlas
Updated Branches:
  refs/heads/master d541a3786 -> 05514255c


ATLAS-2075: notification enhancement to handle large messages, using 
compression and multi-part messages (#2)

(cherry picked from commit 28941bfe7220fc0cf09d138e79e18d5e0a82c367)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/05514255
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/05514255
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/05514255

Branch: refs/heads/master
Commit: 05514255c6fcb988b68976b2d2d8be95ac80376e
Parents: d541a37
Author: Madhan Neethiraj <[email protected]>
Authored: Sat Sep 30 13:33:30 2017 -0700
Committer: Madhan Neethiraj <[email protected]>
Committed: Sat Sep 30 15:12:30 2017 -0700

----------------------------------------------------------------------
 .../atlas/notification/AbstractNotification.java      |  9 ++++-----
 .../AtlasNotificationMessageDeserializer.java         | 14 +++++++-------
 2 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/05514255/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
 
b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index 1f9404d..6a70734 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -190,16 +190,13 @@ public abstract class AbstractNotification implements 
NotificationInterface {
 
                 if (msgLengthExceedsLimit) {
                     // compressed messages are already base64-encoded
-                    byte[] encodedBytes = compressionKind != 
CompressionKind.NONE ? msgBytes : 
AtlasNotificationBaseMessage.encodeBase64(msgBytes);
-
-                    int splitCount = encodedBytes.length / 
MESSAGE_MAX_LENGTH_BYTES;
+                    byte[] encodedBytes = MESSAGE_COMPRESSION_ENABLED ? 
msgBytes : AtlasNotificationBaseMessage.encodeBase64(msgBytes);
+                    int    splitCount   = encodedBytes.length / 
MESSAGE_MAX_LENGTH_BYTES;
 
                     if ((encodedBytes.length % MESSAGE_MAX_LENGTH_BYTES) != 0) 
{
                         splitCount++;
                     }
 
-                    LOG.info("Splitting large message: msgID={}, length={} 
bytes, splitCount={}", msgId, encodedBytes.length, splitCount);
-
                     for (int i = 0, offset = 0; i < splitCount; i++) {
                         int length = MESSAGE_MAX_LENGTH_BYTES;
 
@@ -215,6 +212,8 @@ public abstract class AbstractNotification implements 
NotificationInterface {
 
                         offset += length;
                     }
+
+                    LOG.info("Split large message: msgID={}, splitCount={}, 
length={} bytes", msgId, splitCount, encodedBytes.length);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/atlas/blob/05514255/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
 
b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index b1ac2fa..3d80284 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -107,15 +107,15 @@ public abstract class 
AtlasNotificationMessageDeserializer<T> implements Message
                     }
 
                     if (splitMsgs == null) {
-                        LOG.error("Received multi-part message: msgID={}, {} 
of {}, but first message didn't arrive. Ignoring message", msgId, splitIdx + 1, 
splitCount);
+                        LOG.error("Received msgID={}: {} of {}, but first 
message didn't arrive. Ignoring message", msgId, splitIdx + 1, splitCount);
 
                         msg = null;
                     } else if (splitMsgs.length <= splitIdx) {
-                        LOG.error("Received multi-part message: msgID={}, {} 
of {} - out of bounds. Ignoring message", msgId, splitIdx + 1, splitCount);
+                        LOG.error("Received msgID={}: {} of {} - out of 
bounds. Ignoring message", msgId, splitIdx + 1, splitCount);
 
                         msg = null;
                     } else {
-                        LOG.info("Received multi-part message: msgID={}, {} of 
{}", msgId, splitIdx + 1, splitCount);
+                        LOG.info("Received msgID={}: {} of {}", msgId, 
splitIdx + 1, splitCount);
 
                         splitMsgs[splitIdx] = splitMsg;
 
@@ -130,7 +130,7 @@ public abstract class 
AtlasNotificationMessageDeserializer<T> implements Message
                                 splitMsg = splitMsgs[i];
 
                                 if (splitMsg == null) {
-                                    LOG.warn("Multi-part message: msgID={}, 
message {} of {} is missing. Ignoring message", msgId, i + 1, splitCount);
+                                    LOG.warn("MsgID={}: message {} of {} is 
missing. Ignoring message", msgId, i + 1, splitCount);
 
                                     isValidMessage = false;
 
@@ -149,14 +149,14 @@ public abstract class 
AtlasNotificationMessageDeserializer<T> implements Message
 
                                     msgJson = 
AtlasNotificationBaseMessage.getStringUtf8(bytes);
 
-                                    LOG.info("Received multi-part, compressed 
message: msgID={}, compressed={} bytes, uncompressed={} bytes", msgId, 
encodedBytes.length, bytes.length);
+                                    LOG.info("Received msgID={}: 
splitCount={}, compressed={} bytes, uncompressed={} bytes", msgId, splitCount, 
encodedBytes.length, bytes.length);
                                 } else {
                                     byte[] encodedBytes = 
AtlasNotificationBaseMessage.getBytesUtf8(msgJson);
                                     byte[] bytes        = 
AtlasNotificationBaseMessage.decodeBase64(encodedBytes);
 
                                     msgJson = 
AtlasNotificationBaseMessage.getStringUtf8(bytes);
 
-                                    LOG.info("Received multi-part message: 
msgID={}, compressed={} bytes, uncompressed={} bytes", msgId, 
encodedBytes.length, bytes.length);
+                                    LOG.info("Received msgID={}: 
splitCount={}, length={} bytes", msgId, splitCount, bytes.length);
                                 }
 
                                 msg = gson.fromJson(msgJson, 
AtlasNotificationBaseMessage.class);
@@ -179,7 +179,7 @@ public abstract class 
AtlasNotificationMessageDeserializer<T> implements Message
 
                     msgJson = 
AtlasNotificationBaseMessage.getStringUtf8(bytes);
 
-                    LOG.info("Received compressed message: msgID={}, 
compressed={} bytes, uncompressed={} bytes", compressedMsg.getMsgId(), 
encodedBytes.length, bytes.length);
+                    LOG.info("Received msgID={}: compressed={} bytes, 
uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, 
bytes.length);
                 }
 
                 AtlasNotificationMessage<T> atlasNotificationMessage = 
gson.fromJson(msgJson, notificationMessageType);

Reply via email to