This is an automated email from the ASF dual-hosted git repository.
pinal pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 0b18b1483 ATLAS-4575 : SDX Version Compatibility - Handle cross
version messages
0b18b1483 is described below
commit 0b18b1483a7980721f9393413a9f18627f82864e
Author: vinayak.marraiya <[email protected]>
AuthorDate: Tue Jun 14 14:14:55 2022 +0530
ATLAS-4575 : SDX Version Compatibility - Handle cross version messages
Signed-off-by: Pinal Shah <[email protected]>
(cherry picked from commit bad730c76de50ae9b4682b1c0a357d70edccbcc9)
---
.../AtlasNotificationMessageDeserializer.java | 13 ++++++++++
.../org/apache/atlas/kafka/KafkaConsumerTest.java | 28 ++++++++++++++++++++++
2 files changed, 41 insertions(+)
diff --git
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index b43bc7c66..207747d7d 100644
---
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++
b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -26,6 +26,7 @@ import
org.apache.atlas.model.notification.AtlasNotificationMessage;
import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.model.notification.MessageVersion;
+import org.apache.atlas.model.notification.MessageSource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -233,6 +234,7 @@ public abstract class
AtlasNotificationMessageDeserializer<T> implements Message
AtlasNotificationMessage<T> atlasNotificationMessage =
AtlasType.fromV1Json(msgJson, notificationMessageType);
+ checkCrossCombatMessageVersion(atlasNotificationMessage);
checkVersion(atlasNotificationMessage, msgJson);
ret = atlasNotificationMessage.getMessage();
@@ -317,4 +319,15 @@ public abstract class
AtlasNotificationMessageDeserializer<T> implements Message
notificationLogger.info(String.format(VERSION_MISMATCH_MSG,
expectedVersion, notificationMessage.getVersion(), messageJson));
}
}
+
+ protected void checkCrossCombatMessageVersion(AtlasNotificationBaseMessage
notificationMessage) {
+ String sourceVersion = new
MessageSource(this.getClass().getSimpleName()).getVersion();
+ MessageSource notificationSourceVersion =
notificationMessage.getSource();
+
+ if (notificationMessage.getSource() != null) {
+ if
(!StringUtils.equalsIgnoreCase(notificationSourceVersion.getVersion(),
sourceVersion)) {
+ LOG.warn("Hook and Atlas server build versions are not similar
: {}, {}", notificationSourceVersion.getVersion(), sourceVersion);
+ }
+ }
+ }
}
diff --git
a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 1af1f3efa..425c8941b 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -20,6 +20,7 @@ package org.apache.atlas.kafka;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.model.notification.MessageSource;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.atlas.v1.model.instance.Struct;
import org.apache.atlas.notification.IncompatibleVersionException;
@@ -61,6 +62,9 @@ public class KafkaConsumerTest {
@Mock
private KafkaConsumer kafkaConsumer;
+ @Mock
+ private MessageSource messageSource;
+
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
@@ -170,4 +174,28 @@ public class KafkaConsumerTest {
assertEquals(deserializedEntity.getTraits(), entity.getTraits());
assertEquals(deserializedEntity.getTrait(TRAIT_NAME),
entity.getTrait(TRAIT_NAME));
}
+
+ @Test
+ public void checkCrossCombatMessageVersionTest() throws Exception {
+ Referenceable entity = getEntity(TRAIT_NAME);
+ EntityUpdateRequest message = new
EntityUpdateRequest("user1", entity);
+ when(messageSource.getVersion()).thenReturn("9.9.9");
+ String json = AtlasType.toV1Json(new
AtlasNotificationMessage<>(new MessageVersion("2.0.0"),
message,"","",false,messageSource));
+ TopicPartition tp = new
TopicPartition(ATLAS_HOOK_TOPIC,0);
+ List<ConsumerRecord<String, String>> klist =
Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L,
"mykey", json));
+ Map mp =
Collections.singletonMap(tp,klist);
+ ConsumerRecords records = new ConsumerRecords(mp);
+
+ kafkaConsumer.assign(Collections.singletonList(tp));
+
+ when(kafkaConsumer.poll(100L)).thenReturn(records);
+
+ AtlasKafkaConsumer consumer =new
AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer ,false, 100L);
+
+ try {
+ List<AtlasKafkaMessage<HookNotification>> messageList =
consumer.receive();
+ } catch (IncompatibleVersionException e) {
+ e.printStackTrace();
+ }
+ }
}