This is an automated email from the ASF dual-hosted git repository.

pinal pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 0b18b1483 ATLAS-4575 : SDX Version Compatibility - Handle cross 
version messages
0b18b1483 is described below

commit 0b18b1483a7980721f9393413a9f18627f82864e
Author: vinayak.marraiya <[email protected]>
AuthorDate: Tue Jun 14 14:14:55 2022 +0530

    ATLAS-4575 : SDX Version Compatibility - Handle cross version messages
    
    Signed-off-by: Pinal Shah <[email protected]>
    (cherry picked from commit bad730c76de50ae9b4682b1c0a357d70edccbcc9)
---
 .../AtlasNotificationMessageDeserializer.java      | 13 ++++++++++
 .../org/apache/atlas/kafka/KafkaConsumerTest.java  | 28 ++++++++++++++++++++++
 2 files changed, 41 insertions(+)

diff --git 
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
 
b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index b43bc7c66..207747d7d 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -26,6 +26,7 @@ import 
org.apache.atlas.model.notification.AtlasNotificationMessage;
 import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.model.notification.MessageVersion;
+import org.apache.atlas.model.notification.MessageSource;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -233,6 +234,7 @@ public abstract class 
AtlasNotificationMessageDeserializer<T> implements Message
 
                 AtlasNotificationMessage<T> atlasNotificationMessage = 
AtlasType.fromV1Json(msgJson, notificationMessageType);
 
+                checkCrossCombatMessageVersion(atlasNotificationMessage);
                 checkVersion(atlasNotificationMessage, msgJson);
 
                 ret = atlasNotificationMessage.getMessage();
@@ -317,4 +319,15 @@ public abstract class 
AtlasNotificationMessageDeserializer<T> implements Message
             notificationLogger.info(String.format(VERSION_MISMATCH_MSG, 
expectedVersion, notificationMessage.getVersion(), messageJson));
         }
     }
+
+    protected void checkCrossCombatMessageVersion(AtlasNotificationBaseMessage 
notificationMessage) {
+        String        sourceVersion             = new 
MessageSource(this.getClass().getSimpleName()).getVersion();
+        MessageSource notificationSourceVersion = 
notificationMessage.getSource();
+
+        if (notificationMessage.getSource() != null) {
+            if 
(!StringUtils.equalsIgnoreCase(notificationSourceVersion.getVersion(), 
sourceVersion)) {
+                LOG.warn("Hook and Atlas server build versions are not similar 
: {}, {}", notificationSourceVersion.getVersion(), sourceVersion);
+            }
+        }
+    }
 }
diff --git 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 1af1f3efa..425c8941b 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -20,6 +20,7 @@ package org.apache.atlas.kafka;
 
 import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.model.notification.MessageSource;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.instance.Struct;
 import org.apache.atlas.notification.IncompatibleVersionException;
@@ -61,6 +62,9 @@ public class KafkaConsumerTest {
     @Mock
     private KafkaConsumer kafkaConsumer;
 
+    @Mock
+    private MessageSource messageSource;
+
     @BeforeMethod
     public void setup() {
         MockitoAnnotations.initMocks(this);
@@ -170,4 +174,28 @@ public class KafkaConsumerTest {
         assertEquals(deserializedEntity.getTraits(), entity.getTraits());
         assertEquals(deserializedEntity.getTrait(TRAIT_NAME), 
entity.getTrait(TRAIT_NAME));
     }
+
+    @Test
+    public void checkCrossCombatMessageVersionTest() throws Exception {
+        Referenceable                        entity  = getEntity(TRAIT_NAME);
+        EntityUpdateRequest                  message = new 
EntityUpdateRequest("user1", entity);
+        when(messageSource.getVersion()).thenReturn("9.9.9");
+        String                               json    = AtlasType.toV1Json(new 
AtlasNotificationMessage<>(new MessageVersion("2.0.0"), 
message,"","",false,messageSource));
+        TopicPartition                       tp      = new 
TopicPartition(ATLAS_HOOK_TOPIC,0);
+        List<ConsumerRecord<String, String>> klist   = 
Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, 
"mykey", json));
+        Map                                  mp      = 
Collections.singletonMap(tp,klist);
+        ConsumerRecords                      records = new ConsumerRecords(mp);
+
+        kafkaConsumer.assign(Collections.singletonList(tp));
+
+        when(kafkaConsumer.poll(100L)).thenReturn(records);
+
+        AtlasKafkaConsumer consumer =new 
AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer ,false, 100L);
+
+        try {
+            List<AtlasKafkaMessage<HookNotification>> messageList = 
consumer.receive();
+        } catch (IncompatibleVersionException e) {
+            e.printStackTrace();
+        }
+    }
 }

Reply via email to