This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 7527ca0  ATLAS-2810:- Add notifications from RelationshipStore to 
Kafka ATLAS_ENTITITES topic.
7527ca0 is described below

commit 7527ca06db3f82c981d61df855fd2ebe0ac96094
Author: nikhilbonte <nikhil.bo...@freestoneinfotech.com>
AuthorDate: Fri Jan 4 18:37:01 2019 +0530

    ATLAS-2810:- Add notifications from RelationshipStore to Kafka 
ATLAS_ENTITITES topic.
---
 .../java/org/apache/atlas/AtlasConfiguration.java  |   1 +
 .../atlas/listener/EntityChangeListenerV2.java     |  25 +++
 .../model/instance/AtlasRelationshipHeader.java    | 203 +++++++++++++++++++++
 .../model/notification/EntityNotification.java     |  27 ++-
 .../repository/audit/EntityAuditListenerV2.java    |  24 ++-
 .../store/graph/v2/AtlasEntityChangeNotifier.java  |  54 ++++++
 .../store/graph/v2/AtlasRelationshipStoreV2.java   |  23 ++-
 .../store/graph/v2/EntityGraphRetriever.java       |   4 +-
 .../notification/EntityNotificationListenerV2.java |  55 +++++-
 9 files changed, 398 insertions(+), 18 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java 
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index c5357f5..3ff1316 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -34,6 +34,7 @@ public enum AtlasConfiguration {
     QUERY_PARAM_MAX_LENGTH("atlas.query.param.max.length", 4*1024),
 
     
REST_API_ENABLE_DELETE_TYPE_OVERRIDE("atlas.rest.enable.delete.type.override", 
false),
+    
NOTIFICATION_RELATIONSHIPS_ENABLED("atlas.notification.relationships.enabled", 
false),
 
     NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", 
"ATLAS_HOOK"),
     NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", 
"ATLAS_ENTITIES"),
diff --git 
a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java 
b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
index cccf0d4..106c797 100644
--- a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
+++ b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java
@@ -23,6 +23,7 @@ import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
 
 import java.util.List;
 
@@ -96,4 +97,28 @@ public interface EntityChangeListenerV2 {
      * @param entities list of entities to which the term is assigned
      */
     void onTermDeleted(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> 
entities) throws AtlasBaseException;
+
+    /**
+     * This is upon adding new relationships to the repository.
+     *
+     * @param relationships the created relationships
+     * @param isImport
+     */
+    void onRelationshipsAdded(List<AtlasRelationship> relationships, boolean 
isImport) throws AtlasBaseException;
+
+    /**
+     * This is upon updating an relationships.
+     *
+     * @param relationships the updated relationships
+     * @param isImport
+     */
+    void onRelationshipsUpdated(List<AtlasRelationship> relationships, boolean 
isImport) throws AtlasBaseException;
+
+    /**
+     * This is upon deleting relationships from the repository.
+     *
+     * @param relationships the deleted relationships
+     * @param isImport
+     */
+    void onRelationshipsDeleted(List<AtlasRelationship> relationships, boolean 
isImport) throws AtlasBaseException;
 }
\ No newline at end of file
diff --git 
a/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationshipHeader.java
 
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationshipHeader.java
new file mode 100644
index 0000000..c76807a
--- /dev/null
+++ 
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationshipHeader.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.instance;
+
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.PList;
+import org.apache.atlas.model.SearchFilter.SortType;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlSeeAlso;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class AtlasRelationshipHeader extends AtlasStruct implements 
Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private String                                  guid                = null;
+    private AtlasEntity.Status                      status              = 
AtlasEntity.Status.ACTIVE;
+    private AtlasRelationshipDef.PropagateTags      propagateTags       = 
AtlasRelationshipDef.PropagateTags.NONE;
+    private String                                  label               = null;
+    private AtlasObjectId                           end1                = null;
+    private AtlasObjectId                           end2                = null;
+
+    public AtlasRelationshipHeader() {
+
+    }
+
+    public AtlasRelationshipHeader(String typeName, String guid) {
+        super(typeName);
+        setGuid(guid);
+    }
+
+    public AtlasRelationshipHeader(String typeName, String guid, AtlasObjectId 
end1, AtlasObjectId end2) {
+        this(typeName, guid);
+
+        setEnd1(end1);
+        setEnd2(end2);
+    }
+
+    public AtlasRelationshipHeader(String typeName, String guid, AtlasObjectId 
end1, AtlasObjectId end2, AtlasEntity.Status status) {
+        this(typeName, guid, end1, end2);
+        setStatus(status);
+    }
+
+    public AtlasRelationshipHeader(AtlasRelationship relationship) {
+        this(relationship.getTypeName(), relationship.getGuid(), 
relationship.getEnd1(), relationship.getEnd2());
+
+        setLabel(relationship.getLabel());
+        switch (relationship.getStatus()) {
+            case ACTIVE:
+                setStatus(AtlasEntity.Status.ACTIVE);
+                break;
+
+            case DELETED:
+                setStatus(AtlasEntity.Status.DELETED);
+                break;
+        }
+    }
+
+
+    public String getGuid() {
+        return guid;
+    }
+
+    public void setGuid(String guid) {
+        this.guid = guid;
+    }
+
+    public AtlasEntity.Status getStatus() {
+        return status;
+    }
+
+    public void setStatus(AtlasEntity.Status status) {
+        this.status = status;
+    }
+
+    public void setPropagateTags(AtlasRelationshipDef.PropagateTags 
propagateTags) {
+        this.propagateTags = propagateTags;
+    }
+
+    public String getLabel() {
+        return label;
+    }
+
+    public void setLabel(String label) {
+        this.label = label;
+    }
+
+    public AtlasObjectId getEnd1() {
+        return end1;
+    }
+
+    public void setEnd1(AtlasObjectId end1) {
+        this.end1 = end1;
+    }
+
+    public AtlasObjectId getEnd2() {
+        return end2;
+    }
+
+    public void setEnd2(AtlasObjectId end2) {
+        this.end2 = end2;
+    }
+
+    public StringBuilder toString(StringBuilder sb) {
+        if (sb == null) {
+            sb = new StringBuilder();
+        }
+
+        sb.append("AtlasRelationshipHeader{");
+        sb.append("guid='").append(guid).append('\'');
+        sb.append(", status=").append(status);
+        sb.append(", label=").append(label);
+        sb.append(", propagateTags=").append(propagateTags);
+        sb.append(", end1=").append(end1);
+        sb.append(", end2=").append(end2);
+        super.toString(sb);
+        sb.append('}');
+
+        return sb;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
+        AtlasRelationshipHeader that = (AtlasRelationshipHeader) o;
+        return Objects.equals(guid, that.guid) &&
+                       status == that.status &&
+                       Objects.equals(label, that.label) &&
+                       Objects.equals(propagateTags, that.propagateTags) &&
+                       Objects.equals(end1, that.end1) &&
+                       Objects.equals(end2, that.end2);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), guid, status);
+    }
+
+    @Override
+    public String toString() {
+        return toString(new StringBuilder()).toString();
+    }
+
+    /**
+     * REST serialization friendly list.
+     */
+    @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+    @JsonIgnoreProperties(ignoreUnknown = true)
+    @XmlRootElement
+    @XmlAccessorType(XmlAccessType.PROPERTY)
+    @XmlSeeAlso(AtlasEntity.class)
+    public static class AtlasRelationshipHeaders extends 
PList<AtlasRelationshipHeader> {
+        private static final long serialVersionUID = 1L;
+
+        public AtlasRelationshipHeaders() {
+            super();
+        }
+
+        public AtlasRelationshipHeaders(List<AtlasRelationshipHeader> list) {
+            super(list);
+        }
+
+        public AtlasRelationshipHeaders(List list, long startIndex, int 
pageSize, long totalCount,
+                                  SortType sortType, String sortBy) {
+            super(list, startIndex, pageSize, totalCount, sortType, sortBy);
+        }
+    }
+}
diff --git 
a/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java
 
b/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java
index 1eae100..74ea79c 100644
--- 
a/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java
+++ 
b/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasRelationshipHeader;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -100,15 +101,21 @@ public class EntityNotification implements Serializable {
 
         public enum OperationType {
             ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE,
-            CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE
+            CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE,
+            RELATIONSHIP_CREATE, RELATIONSHIP_UPDATE, RELATIONSHIP_DELETE
         }
 
         private AtlasEntityHeader entity;
+        private AtlasRelationshipHeader relationship;
         private OperationType     operationType;
         private long              eventTime;
 
         public EntityNotificationV2() {
-            this(null, null, System.currentTimeMillis());
+            super(ENTITY_NOTIFICATION_V2);
+
+            setEntity(null);
+            setOperationType(null);
+            setEventTime(System.currentTimeMillis());
         }
 
         public EntityNotificationV2(AtlasEntityHeader entity, OperationType 
operationType) {
@@ -123,6 +130,14 @@ public class EntityNotification implements Serializable {
             setEventTime(eventTime);
         }
 
+        public EntityNotificationV2(AtlasRelationshipHeader relationship, 
OperationType operationType, long eventTime) {
+            super(ENTITY_NOTIFICATION_V2);
+
+            setRelationship(relationship);
+            setOperationType(operationType);
+            setEventTime(eventTime);
+        }
+
         public AtlasEntityHeader getEntity() {
             return entity;
         }
@@ -131,6 +146,14 @@ public class EntityNotification implements Serializable {
             this.entity = entity;
         }
 
+        public AtlasRelationshipHeader getRelationship() {
+            return relationship;
+        }
+
+        public void setRelationship(AtlasRelationshipHeader relationship) {
+            this.relationship = relationship;
+        }
+
         public OperationType getOperationType() {
             return operationType;
         }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
index 16d8879..20624da 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -27,6 +27,7 @@ import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
@@ -462,4 +463,25 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
 
         return ret;
     }
-}
\ No newline at end of file
+
+    @Override
+    public void onRelationshipsAdded(List<AtlasRelationship> relationships, 
boolean isImport) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("New relationship(s) added to repository(" + 
relationships.size() + ")");
+        }
+    }
+
+    @Override
+    public void onRelationshipsUpdated(List<AtlasRelationship> relationships, 
boolean isImport) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Relationship(s) updated(" + relationships.size() + ")");
+        }
+    }
+
+    @Override
+    public void onRelationshipsDeleted(List<AtlasRelationship> relationships, 
boolean isImport) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Relationship(s) deleted from repository(" + 
relationships.size() + ")");
+        }
+    }
+}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
index ca3179a..a6a18dc 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java
@@ -31,8 +31,10 @@ import org.apache.atlas.model.instance.AtlasEntity;
 
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
+import org.apache.atlas.model.notification.EntityNotification;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
@@ -112,6 +114,24 @@ public class AtlasEntityChangeNotifier {
         notifyPropagatedEntities();
     }
 
+    public void notifyRelationshipMutation(AtlasRelationship relationship, 
EntityNotification.EntityNotificationV2.OperationType operationType) throws 
AtlasBaseException {
+        if (CollectionUtils.isEmpty(entityChangeListeners) || 
instanceConverter == null) {
+            return;
+        }
+
+        switch (operationType) {
+            case RELATIONSHIP_CREATE:
+                
notifyRelationshipListeners(Collections.singletonList(relationship), 
EntityOperation.CREATE, false);
+                break;
+            case RELATIONSHIP_UPDATE:
+                
notifyRelationshipListeners(Collections.singletonList(relationship), 
EntityOperation.UPDATE, false);
+                break;
+            case RELATIONSHIP_DELETE:
+                
notifyRelationshipListeners(Collections.singletonList(relationship), 
EntityOperation.DELETE, false);
+                break;
+        }
+    }
+
     public void onClassificationAddedToEntity(AtlasEntity entity, 
List<AtlasClassification> addedClassifications) throws AtlasBaseException {
         if (isV2EntityNotificationEnabled) {
             doFullTextMapping(entity.getGuid());
@@ -284,6 +304,20 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    private void notifyRelationshipListeners(List<AtlasRelationship> 
relationships, EntityOperation operation, boolean isImport) throws 
AtlasBaseException {
+        if (CollectionUtils.isEmpty(relationships)) {
+            return;
+        }
+
+        if (isV2EntityNotificationEnabled) {
+            notifyV2RelationshipListeners(relationships, operation, isImport);
+            return;
+        }
+
+        LOG.warn("Relationships not supported by v1 notifications. {}", 
relationships);
+    }
+
+
     private void notifyV1Listeners(List<AtlasEntityHeader> entityHeaders, 
EntityOperation operation, boolean isImport) throws AtlasBaseException {
         List<Referenceable> typedRefInsts = toReferenceables(entityHeaders, 
operation);
 
@@ -315,10 +349,12 @@ public class AtlasEntityChangeNotifier {
                 case CREATE:
                     listener.onEntitiesAdded(entities, isImport);
                     break;
+
                 case UPDATE:
                 case PARTIAL_UPDATE:
                     listener.onEntitiesUpdated(entities, isImport);
                     break;
+
                 case DELETE:
                     listener.onEntitiesDeleted(entities, isImport);
                     break;
@@ -326,6 +362,24 @@ public class AtlasEntityChangeNotifier {
         }
     }
 
+    private void notifyV2RelationshipListeners(List<AtlasRelationship> 
relationships, EntityOperation operation, boolean isImport) throws 
AtlasBaseException {
+
+        for (EntityChangeListenerV2 listener : entityChangeListenersV2) {
+            switch (operation) {
+                case CREATE:
+                    listener.onRelationshipsAdded(relationships, isImport);
+                    break;
+                case UPDATE:
+                case PARTIAL_UPDATE:
+                    listener.onRelationshipsUpdated(relationships, isImport);
+                    break;
+                case DELETE:
+                    listener.onRelationshipsDeleted(relationships, isImport);
+                    break;
+            }
+        }
+    }
+
         private List<Referenceable> toReferenceables(List<AtlasEntityHeader> 
entityHeaders, EntityOperation operation) throws AtlasBaseException {
         List<Referenceable> ret = new ArrayList<>(entityHeaders.size());
 
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
index 6371f56..47a7546 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
@@ -31,6 +31,7 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasRelationship;
 import 
org.apache.atlas.model.instance.AtlasRelationship.AtlasRelationshipWithExtInfo;
+import 
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType;
 import org.apache.atlas.model.typedef.AtlasRelationshipDef;
 import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
 import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
@@ -76,6 +77,7 @@ import static 
org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY;
 import static 
org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
 import static 
org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
 import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY;
+import static 
org.apache.atlas.AtlasConfiguration.NOTIFICATION_RELATIONSHIPS_ENABLED;
 
 
 import static 
org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
@@ -92,6 +94,7 @@ public class AtlasRelationshipStoreV2 implements 
AtlasRelationshipStore {
     private static final Logger LOG = 
LoggerFactory.getLogger(AtlasRelationshipStoreV2.class);
 
     private static final Long DEFAULT_RELATIONSHIP_VERSION = 0L;
+    private boolean notificationsEnabled = 
NOTIFICATION_RELATIONSHIPS_ENABLED.getBoolean();
 
     private final AtlasTypeRegistry         typeRegistry;
     private final EntityGraphRetriever      entityRetriever;
@@ -125,9 +128,6 @@ public class AtlasRelationshipStoreV2 implements 
AtlasRelationshipStore {
             LOG.debug("<== create({}): {}", relationship, ret);
         }
 
-        // notify entities for added/removed classification propagation
-        entityChangeNotifier.notifyPropagatedEntities();
-
         return ret;
     }
 
@@ -195,9 +195,7 @@ public class AtlasRelationshipStoreV2 implements 
AtlasRelationshipStore {
         validateRelationship(end1Vertex, end2Vertex, edgeType, 
relationship.getAttributes());
 
         AtlasRelationship ret = updateRelationship(edge, relationship);
-
-        // notify entities for added/removed classification propagation
-        entityChangeNotifier.notifyPropagatedEntities();
+        sendNotifications(ret, OperationType.RELATIONSHIP_UPDATE);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("<== update({}): {}", relationship, ret);
@@ -277,8 +275,7 @@ public class AtlasRelationshipStoreV2 implements 
AtlasRelationshipStore {
 
         
deleteDelegate.getHandler().deleteRelationships(Collections.singleton(edge), 
forceDelete);
 
-        // notify entities for added/removed classification propagation
-        entityChangeNotifier.notifyPropagatedEntities();
+        sendNotifications(entityRetriever.mapEdgeToAtlasRelationship(edge), 
OperationType.RELATIONSHIP_DELETE);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("<== deleteById({}): {}", guid);
@@ -408,6 +405,7 @@ public class AtlasRelationshipStoreV2 implements 
AtlasRelationshipStore {
             throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
         }
 
+        sendNotifications(entityRetriever.mapEdgeToAtlasRelationship(ret), 
OperationType.RELATIONSHIP_CREATE);
         return ret;
     }
 
@@ -498,6 +496,7 @@ public class AtlasRelationshipStoreV2 implements 
AtlasRelationshipStore {
     }
 
     // propagated classifications should contain blocked propagated 
classification
+
     private AtlasVertex 
validateBlockedPropagatedClassification(List<AtlasVertex> 
classificationVertices, AtlasClassification classification) {
         AtlasVertex ret = null;
 
@@ -513,7 +512,6 @@ public class AtlasRelationshipStoreV2 implements 
AtlasRelationshipStore {
 
         return ret;
     }
-
     private void addToBlockedClassificationIds(AtlasEdge edge, List<String> 
classificationIds) {
         if (edge != null) {
             if (classificationIds.isEmpty()) {
@@ -876,4 +874,11 @@ public class AtlasRelationshipStoreV2 implements 
AtlasRelationshipStore {
 
         return (attribute != null) ? attribute.getRelationshipEdgeLabel() : 
null;
     }
+
+    private void sendNotifications(AtlasRelationship ret, OperationType 
relationshipUpdate) throws AtlasBaseException {
+        entityChangeNotifier.notifyPropagatedEntities();
+        if (notificationsEnabled){
+            entityChangeNotifier.notifyRelationshipMutation(ret, 
relationshipUpdate);
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index 7f5e5be..e288cdf 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -1221,8 +1221,8 @@ public class EntityGraphRetriever {
         AtlasVertex end1Vertex = edge.getOutVertex();
         AtlasVertex end2Vertex = edge.getInVertex();
 
-        relationship.setEnd1(new AtlasObjectId(getGuid(end1Vertex), 
getTypeName(end1Vertex)));
-        relationship.setEnd2(new AtlasObjectId(getGuid(end2Vertex), 
getTypeName(end2Vertex)));
+        relationship.setEnd1(new AtlasObjectId(getGuid(end1Vertex), 
getTypeName(end1Vertex), getEntityUniqueAttribute(end1Vertex)));
+        relationship.setEnd2(new AtlasObjectId(getGuid(end2Vertex), 
getTypeName(end2Vertex), getEntityUniqueAttribute(end2Vertex)));
 
         relationship.setLabel(edge.getLabel());
         relationship.setPropagateTags(getPropagateTags(edge));
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
 
b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
index ef9ebab..a844bcd 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
@@ -26,6 +26,8 @@ import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.instance.AtlasRelationshipHeader;
 import 
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2;
 import 
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType;
 import org.apache.atlas.type.AtlasClassificationType;
@@ -48,8 +50,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.CLASSIFICATION_ADD;
+import static 
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.CLASSIFICATION_DELETE;
+import static 
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.CLASSIFICATION_UPDATE;
+import static 
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.ENTITY_CREATE;
+import static 
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.ENTITY_DELETE;
+import static 
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.ENTITY_UPDATE;
+import static 
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.RELATIONSHIP_CREATE;
+import static 
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.RELATIONSHIP_DELETE;
+import static 
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.RELATIONSHIP_UPDATE;
 import static org.apache.atlas.repository.graph.GraphHelper.isInternalType;
-import static 
org.apache.atlas.model.notification.EntityNotification.EntityNotificationV2.OperationType.*;
 import static 
org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.CREATE_TIME;
 import static 
org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.DESCRIPTION;
 import static 
org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever.NAME;
@@ -120,7 +130,6 @@ public class EntityNotificationListenerV2 implements 
EntityChangeListenerV2 {
 
     private void notifyEntityEvents(List<AtlasEntity> entities, OperationType 
operationType) throws AtlasBaseException {
         MetricRecorder metric = 
RequestContext.get().startMetricRecord("entityNotification");
-
         List<EntityNotificationV2> messages = new ArrayList<>();
 
         for (AtlasEntity entity : entities) {
@@ -131,6 +140,27 @@ public class EntityNotificationListenerV2 implements 
EntityChangeListenerV2 {
             messages.add(new 
EntityNotificationV2(toNotificationHeader(entity), operationType, 
RequestContext.get().getRequestTime()));
         }
 
+        sendNotifications(operationType, messages);
+        RequestContext.get().endMetricRecord(metric);
+    }
+
+    private void notifyRelationshipEvents(List<AtlasRelationship> 
relationships, OperationType operationType) throws AtlasBaseException {
+        MetricRecorder metric = 
RequestContext.get().startMetricRecord("entityNotification");
+        List<EntityNotificationV2> messages = new ArrayList<>();
+
+        for (AtlasRelationship relationship : relationships) {
+            if (isInternalType(relationship.getTypeName())) {
+                continue;
+            }
+
+            messages.add(new 
EntityNotificationV2(toNotificationHeader(relationship), operationType, 
RequestContext.get().getRequestTime()));
+        }
+
+        sendNotifications(operationType, messages);
+        RequestContext.get().endMetricRecord(metric);
+    }
+
+    private void sendNotifications(OperationType operationType, 
List<EntityNotificationV2> messages) throws AtlasBaseException {
         if (!messages.isEmpty()) {
             try {
                 notificationSender.send(messages);
@@ -138,8 +168,6 @@ public class EntityNotificationListenerV2 implements 
EntityChangeListenerV2 {
                 throw new 
AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, e, 
operationType.name());
             }
         }
-
-        RequestContext.get().endMetricRecord(metric);
     }
 
     private AtlasEntityHeader toNotificationHeader(AtlasEntity entity) {
@@ -188,6 +216,10 @@ public class EntityNotificationListenerV2 implements 
EntityChangeListenerV2 {
         return ret;
     }
 
+    private AtlasRelationshipHeader toNotificationHeader(AtlasRelationship 
relationship) {
+        return new AtlasRelationshipHeader(relationship);
+    }
+
     private void setAttribute(AtlasEntityHeader entity, String attrName, 
Object attrValue) {
         if (attrValue != null) {
             entity.setAttribute(attrName, attrValue);
@@ -237,4 +269,19 @@ public class EntityNotificationListenerV2 implements 
EntityChangeListenerV2 {
 
         return ret;
     }
+
+    @Override
+    public void onRelationshipsAdded(List<AtlasRelationship> relationships, 
boolean isImport) throws AtlasBaseException {
+        notifyRelationshipEvents(relationships, RELATIONSHIP_CREATE);
+    }
+
+    @Override
+    public void onRelationshipsUpdated(List<AtlasRelationship> relationships, 
boolean isImport) throws AtlasBaseException {
+        notifyRelationshipEvents(relationships, RELATIONSHIP_UPDATE);
+    }
+
+    @Override
+    public void onRelationshipsDeleted(List<AtlasRelationship> relationships, 
boolean isImport) throws AtlasBaseException {
+        notifyRelationshipEvents(relationships, RELATIONSHIP_DELETE);
+    }
 }
\ No newline at end of file

Reply via email to