ATLAS-1907: created RelationshipDefs for all base models and use it when 
creating edges for entities

Signed-off-by: Madhan Neethiraj <mad...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/8fe110c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/8fe110c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/8fe110c3

Branch: refs/heads/master
Commit: 8fe110c3f3c4902d17b082f8431de190fecb0f6f
Parents: b135018
Author: Sarath Subramanian <ssubraman...@hortonworks.com>
Authored: Tue Jul 11 00:40:02 2017 -0700
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Tue Jul 11 05:34:02 2017 -0700

----------------------------------------------------------------------
 addons/models/0010-base_model.json              |  42 +++-
 addons/models/0030-hive_model.json              | 102 +++++++-
 addons/models/0050-falcon_model.json            |  61 ++++-
 addons/models/0060-hbase_model.json             |  44 +++-
 addons/models/0080-storm_model.json             |  23 +-
 .../java/org/apache/atlas/AtlasErrorCode.java   |   7 +-
 .../atlas/model/instance/AtlasEntity.java       |  26 +-
 .../atlas/model/instance/AtlasRelationship.java |  19 +-
 .../atlas/model/instance/AtlasStruct.java       |   2 +-
 .../org/apache/atlas/type/AtlasEntityType.java  | 104 ++++++--
 .../atlas/type/AtlasRelationshipType.java       |  45 +++-
 .../org/apache/atlas/type/AtlasStructType.java  |  49 ++--
 .../java/org/apache/atlas/type/AtlasType.java   |   3 +
 .../apache/atlas/type/AtlasTypeRegistry.java    |   4 +
 .../atlas/repository/graph/GraphHelper.java     |  55 ++++-
 .../bootstrap/AtlasTypeDefStoreInitializer.java |  23 ++
 .../store/graph/v1/AtlasGraphUtilsV1.java       |   4 +-
 .../graph/v1/AtlasRelationshipStoreV1.java      | 137 ++++++-----
 .../store/graph/v1/EntityGraphMapper.java       | 243 ++++++++++++++++---
 .../store/graph/v1/EntityGraphRetriever.java    |  97 +++++++-
 .../graph/v1/AtlasRelationshipStoreV1Test.java  |  99 ++++++++
 21 files changed, 1031 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/addons/models/0010-base_model.json
----------------------------------------------------------------------
diff --git a/addons/models/0010-base_model.json 
b/addons/models/0010-base_model.json
index 7f64d85..303f379 100644
--- a/addons/models/0010-base_model.json
+++ b/addons/models/0010-base_model.json
@@ -93,5 +93,45 @@
                 }
             ]
         }
+    ],
+    "relationshipDefs": [
+        {
+            "name": "dataset_process_inputs",
+            "typeVersion": "1.0",
+            "relationshipCategory": "AGGREGATION",
+            "endDef1": {
+                "type": "DataSet",
+                "name": "sourceToProcesses",
+                "isContainer": "false",
+                "cardinality": "SET"
+            },
+            "endDef2": {
+                "type": "Process",
+                "name": "inputs",
+                "isContainer": "true",
+                "cardinality": "SET",
+                "legacyLabel": "__Process.inputs"
+            },
+            "propagateTags": "NONE"
+        },
+        {
+          "name": "process_dataset_outputs",
+          "typeVersion": "1.0",
+          "relationshipCategory": "AGGREGATION",
+          "endDef1": {
+                "type": "Process",
+                "name": "outputs",
+                "isContainer": "true",
+                "cardinality": "SET",
+                "legacyLabel": "__Process.outputs"
+          },
+          "endDef2": {
+                "type": "DataSet",
+                "name": "sinkFromProcesses",
+                "isContainer": "false",
+                "cardinality": "SET"
+          },
+          "propagateTags": "NONE"
+        }
     ]
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/addons/models/0030-hive_model.json
----------------------------------------------------------------------
diff --git a/addons/models/0030-hive_model.json 
b/addons/models/0030-hive_model.json
index b359ea8..a795f0f 100644
--- a/addons/models/0030-hive_model.json
+++ b/addons/models/0030-hive_model.json
@@ -521,5 +521,105 @@
                 }
             ]
         }
+    ],
+    "relationshipDefs": [
+        {
+            "name": "hive_db_tables",
+            "typeVersion": "1.0",
+            "relationshipCategory": "COMPOSITION",
+            "endDef1": {
+                "type": "hive_db",
+                "name": "tables",
+                "isContainer": "true",
+                "cardinality": "SET"
+            },
+            "endDef2": {
+                "type": "hive_table",
+                "name": "db",
+                "isContainer": "false",
+                "cardinality": "SINGLE",
+                "legacyLabel": "__hive_table.db"
+            },
+            "propagateTags": "ONE_TO_TWO"
+        },
+        {
+            "name": "hive_table_columns",
+            "typeVersion": "1.0",
+            "relationshipCategory": "COMPOSITION",
+            "endDef1": {
+                "type": "hive_table",
+                "name": "columns",
+                "isContainer": "true",
+                "cardinality": "SET",
+                "legacyLabel": "__hive_table.columns"
+            },
+            "endDef2": {
+                "type": "hive_column",
+                "name": "table",
+                "isContainer": "false",
+                "cardinality": "SINGLE",
+                "legacyLabel": "__hive_column.table"
+            },
+            "propagateTags": "ONE_TO_TWO"
+        },
+        {
+            "name": "hive_table_partitionkeys",
+            "typeVersion": "1.0",
+            "relationshipCategory": "COMPOSITION",
+            "endDef1": {
+                "type": "hive_table",
+                "name": "partitionKeys",
+                "isContainer": "true",
+                "cardinality": "SET",
+                "legacyLabel": "__hive_table.partitionKeys"
+            },
+            "endDef2": {
+                "type": "hive_column",
+                "name": "table",
+                "isContainer": "false",
+                "cardinality": "SINGLE",
+                "legacyLabel": "__hive_column.table"
+            },
+            "propagateTags": "ONE_TO_TWO"
+        },
+        {
+            "name": "hive_table_storagedesc",
+            "typeVersion": "1.0",
+            "relationshipCategory": "COMPOSITION",
+            "endDef1": {
+                "type": "hive_table",
+                "name": "sd",
+                "isContainer": "true",
+                "cardinality": "SINGLE",
+                "legacyLabel": "__hive_table.sd"
+            },
+            "endDef2": {
+                "type": "hive_storagedesc",
+                "name": "table",
+                "isContainer": "false",
+                "cardinality": "SINGLE",
+                "legacyLabel": "__hive_storagedesc.table"
+            },
+            "propagateTags": "ONE_TO_TWO"
+        },
+        {
+            "name": "hive_process_column_lineage",
+            "typeVersion": "1.0",
+            "relationshipCategory": "COMPOSITION",
+            "endDef1": {
+                "type": "hive_process",
+                "name": "columnLineages",
+                "isContainer": "true",
+                "cardinality": "SET"
+            },
+            "endDef2": {
+                "type": "hive_column_lineage",
+                "name": "query",
+                "isContainer": "false",
+                "cardinality": "SINGLE",
+                "legacyLabel": "__hive_column_lineage.query"
+            },
+            "propagateTags": "NONE"
+        }
     ]
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/addons/models/0050-falcon_model.json
----------------------------------------------------------------------
diff --git a/addons/models/0050-falcon_model.json 
b/addons/models/0050-falcon_model.json
index b7398ef..7755fa8 100644
--- a/addons/models/0050-falcon_model.json
+++ b/addons/models/0050-falcon_model.json
@@ -143,5 +143,64 @@
                 }
             ]
         }
+    ],
+    "relationshipDefs": [
+        {
+            "name": "falcon_feed_cluster",
+            "typeVersion": "1.0",
+            "relationshipCategory": "COMPOSITION",
+            "endDef1": {
+                "type": "falcon_feed",
+                "name": "stored-in",
+                "isContainer": "false",
+                "cardinality": "SINGLE",
+                "legacyLabel": "__falcon_feed.stored-in"
+            },
+            "endDef2": {
+                "type": "falcon_cluster",
+                "name": "feeds",
+                "isContainer": "true",
+                "cardinality": "SET"
+            },
+            "propagateTags": "NONE"
+        },
+        {
+            "name": "falcon_cluster_process",
+            "typeVersion": "1.0",
+            "relationshipCategory": "COMPOSITION",
+            "endDef1": {
+                "type": "falcon_cluster",
+                "name": "processes",
+                "isContainer": "true",
+                "cardinality": "SET"
+            },
+            "endDef2": {
+                "type": "falcon_process",
+                "name": "runs-on",
+                "isContainer": "false",
+                "cardinality": "SINGLE",
+                "legacyLabel": "__falcon_process.runs-on"
+            },
+            "propagateTags": "NONE"
+        },
+        {
+            "name": "falcon_cluster_feed_creation",
+            "typeVersion": "1.0",
+            "relationshipCategory": "COMPOSITION",
+            "endDef1": {
+                "type": "falcon_cluster",
+                "name": "feedCreations",
+                "isContainer": "true",
+                "cardinality": "SET"
+            },
+            "endDef2": {
+                "type": "falcon_feed_creation",
+                "name": "stored-in",
+                "isContainer": "false",
+                "cardinality": "SINGLE",
+                "legacyLabel": "__falcon_feed_creation.stored-in"
+            },
+            "propagateTags": "NONE"
+        }
     ]
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/addons/models/0060-hbase_model.json
----------------------------------------------------------------------
diff --git a/addons/models/0060-hbase_model.json 
b/addons/models/0060-hbase_model.json
index 42fe00c..1d264df 100644
--- a/addons/models/0060-hbase_model.json
+++ b/addons/models/0060-hbase_model.json
@@ -96,5 +96,47 @@
             ],
             "typeVersion": "1.0"
         }
+    ],
+    "relationshipDefs": [
+        {
+            "name": "hbase_table_column_families",
+            "typeVersion": "1.0",
+            "relationshipCategory": "COMPOSITION",
+            "endDef1": {
+                "type": "hbase_table",
+                "name": "column_families",
+                "isContainer": "true",
+                "cardinality": "SET",
+                "legacyLabel": "__hbase_table.column_families"
+            },
+            "endDef2": {
+                "type": "hbase_column_family",
+                "name": "table",
+                "isContainer": "false",
+                "cardinality": "SINGLE",
+                "legacyLabel": "__hbase_column_family.table"
+            },
+            "propagateTags": "ONE_TO_TWO"
+        },
+        {
+            "name": "hbase_column_family_columns",
+            "typeVersion": "1.0",
+            "relationshipCategory": "COMPOSITION",
+            "endDef1": {
+                "type": "hbase_column_family",
+                "name": "columns",
+                "isContainer": "true",
+                "cardinality": "SET",
+                "legacyLabel": "__hbase_column_family.columns"
+            },
+            "endDef2": {
+                "type": "hbase_column",
+                "name": "column_family",
+                "isContainer": "false",
+                "cardinality": "SINGLE",
+                "legacyLabel": "__hbase_column.column_family"
+            },
+            "propagateTags": "ONE_TO_TWO"
+        }
     ]
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/addons/models/0080-storm_model.json
----------------------------------------------------------------------
diff --git a/addons/models/0080-storm_model.json 
b/addons/models/0080-storm_model.json
index 095936e..25360ff 100644
--- a/addons/models/0080-storm_model.json
+++ b/addons/models/0080-storm_model.json
@@ -142,5 +142,26 @@
                 }
             ]
         }
+    ],
+    "relationshipDefs": [
+        {
+            "name": "storm_topology_nodes",
+            "typeVersion": "1.0",
+            "relationshipCategory": "ASSOCIATION",
+            "endDef1": {
+                "type": "storm_topology",
+                "name": "nodes",
+                "isContainer": "false",
+                "cardinality": "SET",
+                "legacyLabel": "__storm_topology.nodes"
+            },
+            "endDef2": {
+                "type": "storm_node",
+                "name": "topolgies",
+                "isContainer": "false",
+                "cardinality": "SET"
+            },
+            "propagateTags": "NONE"
+        }
     ]
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java 
b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index e8afed1..fd1b004 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -80,13 +80,14 @@ public enum AtlasErrorCode {
     RELATIONSHIPDEF_COMPOSITION_MULTIPLE_PARENTS(400,  "ATLAS-400-00-033", 
"COMPOSITION relationshipDef {0} can only have one parent; so cannot have SET 
cardinality on children"),
     RELATIONSHIPDEF_LIST_ON_END(400,  "ATLAS-400-00-034", "relationshipDef {0} 
cannot have a LIST cardinality on an end"),
     RELATIONSHIPDEF_INVALID_END_TYPE(400,  "ATLAS-400-00-035", 
"relationshipDef {0} has invalid end type {1}"),
-    INVALID_RELATIONSHIP_END_TYPE(400, "ATLAS-400-00-036", "invalid update for 
relationshipDef {0}: new end type {1}, existing end type {2}"),
+    INVALID_RELATIONSHIP_END_TYPE(400, "ATLAS-400-00-036", "invalid 
relationshipDef: {0}: end type 1: {1}, end type 2: {2}"),
     RELATIONSHIPDEF_INVALID_END1_UPDATE(400, "ATLAS-400-00-037", "invalid 
update for relationshipDef {0}: new end1 {1}, existing end1 {2}"),
     RELATIONSHIPDEF_INVALID_END2_UPDATE(400, "ATLAS-400-00-038", "invalid 
update for relationshipDef {0}: new end2 {1}, existing end2 {2}"),
     RELATIONSHIPDEF_INVALID_CATEGORY_UPDATE(400, "ATLAS-400-00-039", "invalid  
update for relationship {0}: new relationshipDef category {1}, existing 
relationshipDef category {2}"),
     RELATIONSHIPDEF_INVALID_NAME_UPDATE(400, "ATLAS-400-00-040", "invalid 
relationshipDef rename for relationship guid {0}: new name {1}, existing name 
{2}"),
-    RELATIONSHIPDEF_END1_NAME_INVALID(400, "ATLAS-400-00-020", "{0}: invalid 
end1 name. Name must not contain query keywords"),
-    RELATIONSHIPDEF_END2_NAME_INVALID(400, "ATLAS-400-00-020", "{0}: invalid 
end2 name. Name must not contain query keywords"),
+    RELATIONSHIPDEF_END1_NAME_INVALID(400, "ATLAS-400-00-041", "{0}: invalid 
end1 name. Name must not contain query keywords"),
+    RELATIONSHIPDEF_END2_NAME_INVALID(400, "ATLAS-400-00-042", "{0}: invalid 
end2 name. Name must not contain query keywords"),
+    RELATIONSHIPDEF_NOT_DEFINED(400, "ATLAS-400-00-043", "No relationshipDef 
defined between {0} and {1} on attribute: {2}"),
     // All Not found enums go here
     TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-001", "Given typename {0} was 
invalid"),
     TYPE_GUID_NOT_FOUND(404, "ATLAS-404-00-002", "Given type guid {0} was 
invalid"),

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
----------------------------------------------------------------------
diff --git 
a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java 
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
index 0e277b1..68da6af 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
@@ -70,6 +70,7 @@ public class AtlasEntity extends AtlasStruct implements 
Serializable {
     private Date   updateTime = null;
     private Long   version    = 0L;
 
+    private Map<String, Object>       relationshipAttributes;
     private List<AtlasClassification> classifications;
 
     @JsonIgnore
@@ -170,6 +171,25 @@ public class AtlasEntity extends AtlasStruct implements 
Serializable {
         this.version = version;
     }
 
+    public Map<String, Object> getRelationshipAttributes() { return 
relationshipAttributes; }
+
+    public void setRelationshipAttributes(Map<String, Object> 
relationshipAttributes) {
+        this.relationshipAttributes = relationshipAttributes;
+    }
+
+    public void addRelationshipAttribute(String name, Object value) {
+        Map<String, Object> r = this.relationshipAttributes;
+
+        if (r != null) {
+            r.put(name, value);
+        } else {
+            r = new HashMap<>();
+            r.put(name, value);
+
+            this.relationshipAttributes = r;
+        }
+    }
+
     public List<AtlasClassification> getClassifications() { return 
classifications; }
 
     public void setClassifications(List<AtlasClassification> classifications) 
{ this.classifications = classifications; }
@@ -204,6 +224,9 @@ public class AtlasEntity extends AtlasStruct implements 
Serializable {
         dumpDateField(", createTime=", createTime, sb);
         dumpDateField(", updateTime=", updateTime, sb);
         sb.append(", version=").append(version);
+        sb.append(", relationshipAttributes=[");
+        dumpObjects(relationshipAttributes, sb);
+        sb.append("]");
         sb.append(", classifications=[");
         AtlasBaseTypeDef.dumpObjects(classifications, sb);
         sb.append(']');
@@ -227,13 +250,14 @@ public class AtlasEntity extends AtlasStruct implements 
Serializable {
                 Objects.equals(createTime, that.createTime) &&
                 Objects.equals(updateTime, that.updateTime) &&
                 Objects.equals(version, that.version) &&
+                Objects.equals(relationshipAttributes, 
that.relationshipAttributes) &&
                 Objects.equals(classifications, that.classifications);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(super.hashCode(), guid, status, createdBy, 
updatedBy, createTime, updateTime, version,
-                            classifications);
+                            relationshipAttributes, classifications);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationship.java
----------------------------------------------------------------------
diff --git 
a/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationship.java 
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationship.java
index 8d2e7ec..2de9bdf 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationship.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasRelationship.java
@@ -50,6 +50,7 @@ public class AtlasRelationship extends AtlasStruct implements 
Serializable {
     private String        guid       = null;
     private AtlasObjectId end1       = null;
     private AtlasObjectId end2       = null;
+    private String        label      = null;
     private Status        status     = Status.ACTIVE;
     private String        createdBy  = null;
     private String        updatedBy  = null;
@@ -81,7 +82,7 @@ public class AtlasRelationship extends AtlasStruct implements 
Serializable {
     public AtlasRelationship(String typeName, AtlasObjectId end1, 
AtlasObjectId end2) {
         super(typeName);
 
-        init(nextInternalId(), end1, end2, null, null, null, null, null, 0L);
+        init(nextInternalId(), end1, end2, null, null, null, null, null, null, 
0L);
     }
 
     public AtlasRelationship(String typeName, String attrName, Object 
attrValue) {
@@ -98,7 +99,7 @@ public class AtlasRelationship extends AtlasStruct implements 
Serializable {
         super(other);
 
         if (other != null) {
-            init(other.guid, other.end1, other.end2, other.status, 
other.createdBy, other.updatedBy,
+            init(other.guid, other.end1, other.end2, other.label, 
other.status, other.createdBy, other.updatedBy,
                  other.createTime, other.updateTime, other.version);
         }
     }
@@ -167,6 +168,10 @@ public class AtlasRelationship extends AtlasStruct 
implements Serializable {
 
     public void setEnd2(AtlasObjectId end2) { this.end2 = end2; }
 
+    public String getLabel() { return label; }
+
+    public void setLabel(String label) { this.label = label; }
+
     private static String nextInternalId() {
         return "-" + Long.toString(s_nextId.getAndIncrement());
     }
@@ -174,15 +179,16 @@ public class AtlasRelationship extends AtlasStruct 
implements Serializable {
     public String getRelationshipLabel() { return "r:" + super.getTypeName(); }
 
     private void init() {
-        init(nextInternalId(), null, null, null, null, null, null, null, 0L);
+        init(nextInternalId(), null, null, null, null, null, null, null, null, 
0L);
     }
 
-    private void init(String guid, AtlasObjectId end1, AtlasObjectId end2,
+    private void init(String guid, AtlasObjectId end1, AtlasObjectId end2, 
String label,
                       Status status, String createdBy, String updatedBy,
                       Date createTime, Date updateTime, Long version) {
         setGuid(guid);
         setEnd1(end1);
         setEnd2(end2);
+        setLabel(label);
         setStatus(status);
         setCreatedBy(createdBy);
         setUpdatedBy(updatedBy);
@@ -202,6 +208,7 @@ public class AtlasRelationship extends AtlasStruct 
implements Serializable {
         sb.append("guid='").append(guid).append('\'');
         sb.append(", end1=").append(end1);
         sb.append(", end2=").append(end2);
+        sb.append(", label='").append(label).append('\'');
         sb.append(", status=").append(status);
         sb.append(", createdBy='").append(createdBy).append('\'');
         sb.append(", updatedBy='").append(updatedBy).append('\'');
@@ -223,6 +230,7 @@ public class AtlasRelationship extends AtlasStruct 
implements Serializable {
         return Objects.equals(guid, that.guid)             &&
                Objects.equals(end1, that.end1)             &&
                Objects.equals(end2, that.end2)             &&
+               Objects.equals(label, that.label)           &&
                status == that.status                       &&
                Objects.equals(createdBy, that.createdBy)   &&
                Objects.equals(updatedBy, that.updatedBy)   &&
@@ -233,7 +241,8 @@ public class AtlasRelationship extends AtlasStruct 
implements Serializable {
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), guid, end1, end2, status, 
createdBy, updatedBy, createTime, updateTime, version);
+        return Objects.hash(super.hashCode(), guid, end1, end2, label, status, 
createdBy,
+                            updatedBy, createTime, updateTime, version);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java
----------------------------------------------------------------------
diff --git 
a/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java 
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java
index 7c8ae2d..80f3a66 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java
@@ -148,7 +148,7 @@ public class AtlasStruct implements Serializable {
         AtlasStruct that = (AtlasStruct) o;
 
         return Objects.equals(typeName, that.typeName) &&
-            Objects.equals(attributes, that.attributes);
+               Objects.equals(attributes, that.attributes);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java 
b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
index 70e3067..e94dd19 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
@@ -26,6 +26,7 @@ import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,11 +48,13 @@ public class AtlasEntityType extends AtlasStructType {
 
     private final AtlasEntityDef entityDef;
 
-    private List<AtlasEntityType>       superTypes             = 
Collections.emptyList();
-    private Set<String>                 allSuperTypes          = 
Collections.emptySet();
-    private Set<String>                 allSubTypes            = 
Collections.emptySet();
-    private Set<String>                 typeAndAllSubTypes     = 
Collections.emptySet();
-    private Map<String, AtlasAttribute> relationshipAttributes = 
Collections.emptyMap();
+    private List<AtlasEntityType>                    superTypes                
 = Collections.emptyList();
+    private Set<String>                              allSuperTypes             
 = Collections.emptySet();
+    private Set<String>                              allSubTypes               
 = Collections.emptySet();
+    private Set<String>                              typeAndAllSubTypes        
 = Collections.emptySet();
+    private Set<String>                              typeAndAllSuperTypes      
 = Collections.emptySet();
+    private Map<String, AtlasAttribute>              relationshipAttributes    
 = Collections.emptyMap();
+    private Map<String, List<AtlasRelationshipType>> 
relationshipAttributesType = Collections.emptyMap();
 
     public AtlasEntityType(AtlasEntityDef entityDef) {
         super(entityDef);
@@ -89,15 +92,20 @@ public class AtlasEntityType extends AtlasStructType {
             }
         }
 
-        this.superTypes             = Collections.unmodifiableList(s);
-        this.allSuperTypes          = Collections.unmodifiableSet(allS);
-        this.allAttributes          = Collections.unmodifiableMap(allA);
-        this.uniqAttributes         = getUniqueAttributes(this.allAttributes);
-        this.allSubTypes            = new HashSet<>();   // this will be 
populated in resolveReferencesPhase2()
-        this.typeAndAllSubTypes     = new HashSet<>();   // this will be 
populated in resolveReferencesPhase2()
-        this.relationshipAttributes = new HashMap<>();   // this will be 
populated in resolveReferencesPhase2()
+        this.superTypes                 = Collections.unmodifiableList(s);
+        this.allSuperTypes              = Collections.unmodifiableSet(allS);
+        this.allAttributes              = Collections.unmodifiableMap(allA);
+        this.uniqAttributes             = 
getUniqueAttributes(this.allAttributes);
+        this.allSubTypes                = new HashSet<>(); // this will be 
populated in resolveReferencesPhase2()
+        this.typeAndAllSubTypes         = new HashSet<>(); // this will be 
populated in resolveReferencesPhase2()
+        this.relationshipAttributes     = new HashMap<>(); // this will be 
populated in resolveReferencesPhase3()
+        this.relationshipAttributesType = new HashMap<>(); // this will be 
populated in resolveReferencesPhase3()
 
         this.typeAndAllSubTypes.add(this.getTypeName());
+
+        this.typeAndAllSuperTypes = new HashSet<>(this.allSuperTypes);
+        this.typeAndAllSuperTypes.add(this.getTypeName());
+        this.typeAndAllSuperTypes = 
Collections.unmodifiableSet(this.typeAndAllSuperTypes);
     }
 
     @Override
@@ -110,6 +118,43 @@ public class AtlasEntityType extends AtlasStructType {
         }
     }
 
+    @Override
+    public void resolveReferencesPhase3(AtlasTypeRegistry typeRegistry) throws 
AtlasBaseException {
+        for (AtlasAttributeDef attributeDef : 
getStructDef().getAttributeDefs()) {
+            String          attributeName       = attributeDef.getName();
+            AtlasType       attributeType       = 
typeRegistry.getType(attributeDef.getTypeName());
+            AtlasEntityType attributeEntityType = 
getReferencedEntityType(attributeType);
+
+            // validate if RelationshipDefs is defined for all entityDefs
+            if (attributeEntityType != null && 
!hasRelationshipAttribute(attributeName)) {
+                LOG.warn("No RelationshipDef defined between {} and {} on 
attribute: {}.{}", getTypeName(),
+                          attributeEntityType.getTypeName(), getTypeName(), 
attributeName);
+            }
+        }
+
+        for (String superTypeName : allSuperTypes) {
+            AtlasEntityType superType = 
typeRegistry.getEntityTypeByName(superTypeName);
+
+            Map<String, AtlasAttribute> superTypeRelationshipAttributes = 
superType.getRelationshipAttributes();
+
+            if (MapUtils.isNotEmpty(superTypeRelationshipAttributes)) {
+                relationshipAttributes.putAll(superTypeRelationshipAttributes);
+            }
+
+            Map<String, List<AtlasRelationshipType>> 
superTypeRelationshipAttributesType = superType.getRelationshipAttributesType();
+
+            if (MapUtils.isNotEmpty(superTypeRelationshipAttributesType)) {
+                
relationshipAttributesType.putAll(superTypeRelationshipAttributesType);
+            }
+        }
+
+        allSubTypes                = Collections.unmodifiableSet(allSubTypes);
+        typeAndAllSubTypes         = 
Collections.unmodifiableSet(typeAndAllSubTypes);
+        typeAndAllSuperTypes       = 
Collections.unmodifiableSet(typeAndAllSuperTypes);
+        relationshipAttributes     = 
Collections.unmodifiableMap(relationshipAttributes);
+        relationshipAttributesType = 
Collections.unmodifiableMap(relationshipAttributesType);
+    }
+
     public Set<String> getSuperTypes() {
         return entityDef.getSuperTypes();
     }
@@ -118,9 +163,11 @@ public class AtlasEntityType extends AtlasStructType {
         return allSuperTypes;
     }
 
-    public Set<String> getAllSubTypes() { return 
Collections.unmodifiableSet(allSubTypes); }
+    public Set<String> getAllSubTypes() { return allSubTypes; }
 
-    public Set<String> getTypeAndAllSubTypes() { return 
Collections.unmodifiableSet(typeAndAllSubTypes); }
+    public Set<String> getTypeAndAllSubTypes() { return typeAndAllSubTypes; }
+
+    public Set<String> getTypeAndAllSuperTypes() { return 
typeAndAllSuperTypes; }
 
     public boolean isSuperTypeOf(AtlasEntityType entityType) {
         return entityType != null && 
allSubTypes.contains(entityType.getTypeName());
@@ -142,12 +189,37 @@ public class AtlasEntityType extends AtlasStructType {
         return StringUtils.isNotEmpty(entityTypeName) && 
allSuperTypes.contains(entityTypeName);
     }
 
-    public Map<String, AtlasAttribute> getRelationshipAttributes() { return 
Collections.unmodifiableMap(relationshipAttributes); }
+    public Map<String, AtlasAttribute> getRelationshipAttributes() { return 
relationshipAttributes; }
 
-    public void addRelationshipAttribute(String attributeName, AtlasAttribute 
attribute) {
+    // this method should be called from 
AtlasRelationshipType.resolveReferencesPhase2()
+    void addRelationshipAttribute(String attributeName, AtlasAttribute 
attribute) {
         relationshipAttributes.put(attributeName, attribute);
     }
 
+    // this method should be called from 
AtlasRelationshipType.resolveReferencesPhase2()
+    void addRelationshipAttributeType(String attributeName, 
AtlasRelationshipType relationshipType) {
+        List<AtlasRelationshipType> relationshipTypes = 
relationshipAttributesType.get(attributeName);
+
+        if (relationshipTypes == null) {
+            relationshipTypes = new ArrayList<>();
+            relationshipAttributesType.put(attributeName, relationshipTypes);
+        }
+
+        relationshipTypes.add(relationshipType);
+    }
+
+    public List<AtlasRelationshipType> getRelationshipAttributeType(String 
attributeName) {
+        return relationshipAttributesType.get(attributeName);
+    }
+
+    public Map<String, List<AtlasRelationshipType>> 
getRelationshipAttributesType() {
+        return relationshipAttributesType;
+    }
+
+    public boolean hasRelationshipAttribute(String attributeName) {
+        return relationshipAttributes.containsKey(attributeName);
+    }
+
     @Override
     public AtlasEntity createDefaultValue() {
         AtlasEntity ret = new AtlasEntity(entityDef.getName());

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
----------------------------------------------------------------------
diff --git 
a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java 
b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
index 49a9493..841b66f 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
@@ -90,9 +90,26 @@ public class AtlasRelationshipType extends AtlasStructType {
     public void resolveReferencesPhase2(AtlasTypeRegistry typeRegistry) throws 
AtlasBaseException {
         super.resolveReferencesPhase2(typeRegistry);
 
-        addRelationshipAttributeToEndType(relationshipDef.getEndDef1(), 
end1Type, end2Type.getTypeName(), typeRegistry);
+        AtlasRelationshipEndDef endDef1           = 
relationshipDef.getEndDef1();
+        AtlasRelationshipEndDef endDef2           = 
relationshipDef.getEndDef2();
+        String                  relationshipLabel = null;
 
-        addRelationshipAttributeToEndType(relationshipDef.getEndDef2(), 
end2Type, end1Type.getTypeName(), typeRegistry);
+        // if legacyLabel is not specified at both ends, use relationshipDef 
name as relationship label.
+        // if legacyLabel is specified in any one end, use it as the 
relationship label for both ends (legacy case).
+        // if legacyLabel is specified at both ends use the respective end's 
legacyLabel as relationship label (legacy case).
+        if (!endDef1.hasLegacyRelation() && !endDef2.hasLegacyRelation()) {
+            relationshipLabel = relationshipDef.getName();
+
+        } else if (endDef1.hasLegacyRelation() && 
!endDef2.hasLegacyRelation()) {
+            relationshipLabel = endDef1.getLegacyLabel();
+
+        } else if (!endDef1.hasLegacyRelation() && 
endDef2.hasLegacyRelation()) {
+            relationshipLabel = endDef2.getLegacyLabel();
+        }
+
+        addRelationshipAttributeToEndType(endDef1, end1Type, 
end2Type.getTypeName(), typeRegistry, relationshipLabel);
+
+        addRelationshipAttributeToEndType(endDef2, end2Type, 
end1Type.getTypeName(), typeRegistry, relationshipLabel);
     }
 
     @Override
@@ -198,10 +215,8 @@ public class AtlasRelationshipType extends AtlasStructType 
{
         }
     }
 
-    private void addRelationshipAttributeToEndType(AtlasRelationshipEndDef 
endDef,
-                                                   AtlasEntityType entityType,
-                                                   String attrTypeName,
-                                                   AtlasTypeRegistry 
typeRegistry) throws AtlasBaseException {
+    private void addRelationshipAttributeToEndType(AtlasRelationshipEndDef 
endDef, AtlasEntityType entityType, String attrTypeName,
+                                                   AtlasTypeRegistry 
typeRegistry, String relationshipLabel) throws AtlasBaseException {
 
         String attrName = (endDef != null) ? endDef.getName() : null;
 
@@ -211,15 +226,29 @@ public class AtlasRelationshipType extends 
AtlasStructType {
 
         AtlasAttribute attribute = entityType.getAttribute(attrName);
 
+        // if relationshipLabel is null, then legacyLabel is mentioned at both 
ends,
+        // use the respective end's legacyLabel as relationshipLabel
+        if (relationshipLabel == null) {
+            relationshipLabel = endDef.getLegacyLabel();
+        }
+
         if (attribute == null) { //attr doesn't exist in type - is a new 
relationship attribute
 
             if (endDef.getCardinality() == Cardinality.SET) {
                 attrTypeName = AtlasBaseTypeDef.getArrayTypeName(attrTypeName);
             }
 
-            attribute = new AtlasAttribute(entityType, new 
AtlasAttributeDef(attrName, attrTypeName), typeRegistry.getType(attrTypeName));
+            attribute = new AtlasAttribute(entityType, new 
AtlasAttributeDef(attrName, attrTypeName),
+                                           typeRegistry.getType(attrTypeName), 
relationshipLabel);
+
+        } else {
+            // attribute already exists (legacy attribute which is also a 
relationship attribute)
+            // add relationshipLabel information to existing attribute
+            attribute.setRelationshipEdgeLabel(relationshipLabel);
         }
 
         entityType.addRelationshipAttribute(attrName, attribute);
+
+        entityType.addRelationshipAttributeType(attrName, this);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java 
b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
index c2e0be5..f97d767 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
@@ -28,7 +28,6 @@ import org.apache.atlas.model.typedef.AtlasStructDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import 
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
-import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
@@ -170,9 +169,10 @@ public class AtlasStructType extends AtlasType {
                 continue;
             }
             // Set the inverse reference attribute.
-            AtlasType referencedType = 
typeRegistry.getType(attribute.getAttributeDef().getTypeName());
+            AtlasType       referencedType       = 
typeRegistry.getType(attribute.getAttributeDef().getTypeName());
             AtlasEntityType referencedEntityType = 
getReferencedEntityType(referencedType);
-            AtlasAttribute inverseReference = 
referencedEntityType.getAttribute(attribute.getInverseRefAttributeName());
+            AtlasAttribute  inverseReference     = 
referencedEntityType.getAttribute(attribute.getInverseRefAttributeName());
+
             attribute.setInverseRefAttribute(inverseReference);
          }
     }
@@ -574,7 +574,7 @@ public class AtlasStructType extends AtlasType {
         throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_ATTRIBUTE, 
attrName, structDef.getName());
     }
 
-    private AtlasEntityType getReferencedEntityType(AtlasType type) {
+    AtlasEntityType getReferencedEntityType(AtlasType type) {
         if (type instanceof AtlasArrayType) {
             type = ((AtlasArrayType)type).getElementType();
         }
@@ -609,16 +609,17 @@ public class AtlasStructType extends AtlasType {
         private final boolean           isOwnedRef;
         private final String            inverseRefAttributeName;
         private AtlasAttribute          inverseRefAttribute;
-
-        public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef 
attrDef, AtlasType attributeType) {
-            this.definedInType      = definedInType;
-            this.attributeDef       = attrDef;
-            this.attributeType      = attributeType.getTypeForAttribute();
-            this.qualifiedName      = 
getQualifiedAttributeName(definedInType.getStructDef(), attributeDef.getName());
-            this.vertexPropertyName = encodePropertyKey(this.qualifiedName);
-
-            boolean isOwnedRef          = false;
-            String  inverseRefAttribute = null;
+        private String                  relationshipEdgeLabel;
+
+        public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef 
attrDef, AtlasType attributeType, String relationshipLabel) {
+            this.definedInType            = definedInType;
+            this.attributeDef             = attrDef;
+            this.attributeType            = 
attributeType.getTypeForAttribute();
+            this.qualifiedName            = 
getQualifiedAttributeName(definedInType.getStructDef(), attributeDef.getName());
+            this.vertexPropertyName       = 
encodePropertyKey(this.qualifiedName);
+            this.relationshipEdgeLabel    = 
getRelationshipEdgeLabel(relationshipLabel);
+            boolean isOwnedRef            = false;
+            String  inverseRefAttribute   = null;
 
             if (CollectionUtils.isNotEmpty(attributeDef.getConstraints())) {
                 for (AtlasConstraintDef constraint : 
attributeDef.getConstraints()) {
@@ -636,10 +637,14 @@ public class AtlasStructType extends AtlasType {
                 }
             }
 
-            this.isOwnedRef          = isOwnedRef;
+            this.isOwnedRef              = isOwnedRef;
             this.inverseRefAttributeName = inverseRefAttribute;
         }
 
+        public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef 
attrDef, AtlasType attributeType) {
+            this(definedInType, attrDef, attributeType, null);
+        }
+
         public AtlasStructType getDefinedInType() { return definedInType; }
 
         public AtlasStructDef getDefinedInDef() { return 
definedInType.getStructDef(); }
@@ -666,7 +671,15 @@ public class AtlasStructType extends AtlasType {
 
         public AtlasAttribute getInverseRefAttribute() { return 
inverseRefAttribute; }
 
-        public void setInverseRefAttribute(AtlasAttribute inverseAttr) { 
inverseRefAttribute = inverseAttr; };
+        public void setInverseRefAttribute(AtlasAttribute inverseAttr) { 
inverseRefAttribute = inverseAttr; }
+
+        public String getRelationshipEdgeLabel() { return 
relationshipEdgeLabel; }
+
+        public void setRelationshipEdgeLabel(String relationshipEdgeLabel) { 
this.relationshipEdgeLabel = relationshipEdgeLabel; }
+
+        public static String getEdgeLabel(String property) {
+            return "__" + property;
+        }
 
         public static String encodePropertyKey(String key) {
             if (StringUtils.isBlank(key)) {
@@ -692,6 +705,10 @@ public class AtlasStructType extends AtlasType {
             return key;
         }
 
+        private String getRelationshipEdgeLabel(String relationshipLabel) {
+            return (relationshipLabel == null) ? 
getEdgeLabel(vertexPropertyName) : relationshipLabel;
+        }
+
         private static String getQualifiedAttributeName(AtlasStructDef 
structDef, String attrName) {
             final String typeName = structDef.getName();
             return attrName.contains(".") ? attrName : String.format("%s.%s", 
typeName, attrName);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/intg/src/main/java/org/apache/atlas/type/AtlasType.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasType.java 
b/intg/src/main/java/org/apache/atlas/type/AtlasType.java
index 86072fe..c99eb7f 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasType.java
@@ -57,6 +57,9 @@ public abstract class AtlasType {
     public void resolveReferencesPhase2(AtlasTypeRegistry typeRegistry) throws 
AtlasBaseException {
     }
 
+    public void resolveReferencesPhase3(AtlasTypeRegistry typeRegistry) throws 
AtlasBaseException {
+    }
+
     public String getTypeName() { return typeName; }
 
     public TypeCategory getTypeCategory() { return typeCategory; }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java 
b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
index 29fae1c..281422b 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java
@@ -364,6 +364,10 @@ public class AtlasTypeRegistry {
             for (AtlasType type : registryData.allTypes.getAllTypes()) {
                 type.resolveReferencesPhase2(this);
             }
+
+            for (AtlasEntityType entityType : 
registryData.entityDefs.getAllTypes()) {
+                entityType.resolveReferencesPhase3(this);
+            }
         }
 
         public void clear() {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index 329dd7a..c5e32d8 100755
--- 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -27,6 +27,7 @@ import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasEntity.Status;
 import org.apache.atlas.model.instance.AtlasRelationship;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.RepositoryException;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
@@ -36,6 +37,8 @@ import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasRelationshipType;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.atlas.typesystem.ITypedInstance;
@@ -345,6 +348,10 @@ public final class GraphHelper {
         return getAdjacentEdgesByLabel(instanceVertex, AtlasEdgeDirection.OUT, 
edgeLabel);
     }
 
+    public Iterator<AtlasEdge> getBothEdgesByLabel(AtlasVertex instanceVertex, 
String edgeLabel) {
+        return getAdjacentEdgesByLabel(instanceVertex, 
AtlasEdgeDirection.BOTH, edgeLabel);
+    }
+
     /**
      * Returns the active edge for the given edge label.
      * If the vertex is deleted and there is no active edge, it returns the 
latest deleted edge
@@ -1223,4 +1230,50 @@ public final class GraphHelper {
 
         return condition.toString();
     }
-}
+
+    /**
+     * Get relationshipDef name from entityType using relationship attribute.
+     * if more than one relationDefs are returned for an attribute.
+     * e.g. hive_column.table
+     *
+     * hive_table.columns       -> hive_column.table
+     * hive_table.partitionKeys -> hive_column.table
+     *
+     * resolve by comparing all incoming edges typename with relationDefs name 
returned for an attribute
+     * to pick the right relationshipDef name
+     */
+    public String getRelationshipDefName(AtlasVertex entityVertex, 
AtlasEntityType entityType, String attributeName) {
+        AtlasRelationshipDef relationshipDef = 
getRelationshipDef(entityVertex, entityType, attributeName);
+
+        return (relationshipDef != null) ? relationshipDef.getName() : null;
+    }
+
+    public AtlasRelationshipDef getRelationshipDef(AtlasVertex entityVertex, 
AtlasEntityType entityType, String attributeName) {
+        List<AtlasRelationshipType> relationshipTypes = 
entityType.getRelationshipAttributeType(attributeName);
+        AtlasRelationshipDef        ret               = null;
+
+        if (relationshipTypes.size() > 1) {
+            Iterator<AtlasEdge> iter = 
entityVertex.getEdges(AtlasEdgeDirection.IN).iterator();
+
+            while (iter.hasNext() && ret == null) {
+                String edgeTypeName = 
AtlasGraphUtilsV1.getTypeName(iter.next());
+
+                for (AtlasRelationshipType relationType : relationshipTypes) {
+                    AtlasRelationshipDef relationshipDef = 
relationType.getRelationshipDef();
+
+                    if (StringUtils.equals(edgeTypeName, 
relationshipDef.getName())) {
+                        ret = relationshipDef;
+
+                        break;
+                    }
+                }
+            }
+
+        } else {
+            //relationshipTypes will have at least one relationshipDef
+            ret = relationshipTypes.get(0).getRelationshipDef();
+        }
+
+        return ret;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index 4530f51..b67c50d 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -24,6 +24,7 @@ import org.apache.atlas.model.typedef.AtlasClassificationDef;
 import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.model.typedef.AtlasEnumDef;
 import org.apache.atlas.model.typedef.AtlasEnumDef.AtlasEnumElementDef;
+import org.apache.atlas.model.typedef.AtlasRelationshipDef;
 import org.apache.atlas.model.typedef.AtlasStructDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
@@ -164,6 +165,14 @@ public class AtlasTypeDefStoreInitializer {
             }
         }
 
+        if (CollectionUtils.isNotEmpty(typesDef.getRelationshipDefs())) {
+            for (AtlasRelationshipDef relationshipDef : 
typesDef.getRelationshipDefs()) {
+                if (!typeRegistry.isRegisteredType(relationshipDef.getName())) 
{
+                    typesToCreate.getRelationshipDefs().add(relationshipDef);
+                }
+            }
+        }
+
         return typesToCreate;
     }
 
@@ -234,6 +243,20 @@ public class AtlasTypeDefStoreInitializer {
             }
         }
 
+        if (CollectionUtils.isNotEmpty(typesDef.getRelationshipDefs())) {
+            for (AtlasRelationshipDef relationshipDef : 
typesDef.getRelationshipDefs()) {
+                AtlasRelationshipDef  oldRelationshipDef = 
typeRegistry.getRelationshipDefByName(relationshipDef.getName());
+
+                if (oldRelationshipDef == null) {
+                    continue;
+                }
+
+                if (updateTypeAttributes(oldRelationshipDef, relationshipDef)) 
{
+                    typesToUpdate.getRelationshipDefs().add(relationshipDef);
+                }
+            }
+        }
+
         return typesToUpdate;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
index 7b3f1e6..cd9a47a 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
@@ -76,8 +76,8 @@ public class AtlasGraphUtilsV1 {
         return vertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class);
     }
 
-    public static String getTypeName(AtlasVertex instanceVertex) {
-        return instanceVertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, 
String.class);
+    public static String getTypeName(AtlasElement element) {
+        return element.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, 
String.class);
     }
 
     public static String getEdgeLabel(String fromNode, String toNode) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
index 8621233..8d9e4be 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasRelationshipStoreV1.java
@@ -23,7 +23,6 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasRelationship;
-import org.apache.atlas.model.typedef.AtlasRelationshipDef;
 import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.RepositoryException;
@@ -47,8 +46,10 @@ import org.springframework.stereotype.Component;
 import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 @Component
@@ -75,18 +76,16 @@ public class AtlasRelationshipStoreV1 implements 
AtlasRelationshipStore {
 
         validateRelationship(relationship);
 
-        String      relationshipLabel = relationship.getRelationshipLabel();
-        AtlasVertex end1Vertex        = 
getVertexFromEndPoint(relationship.getEnd1());
-        AtlasVertex end2Vertex        = 
getVertexFromEndPoint(relationship.getEnd2());
-
+        AtlasVertex       end1Vertex = 
getVertexFromEndPoint(relationship.getEnd1());
+        AtlasVertex       end2Vertex = 
getVertexFromEndPoint(relationship.getEnd2());
         AtlasRelationship ret;
 
         // create relationship between two vertex
         try {
-            AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, 
end2Vertex, relationshipLabel);
+            AtlasEdge relationshipEdge = getRelationshipEdge(end1Vertex, 
end2Vertex, relationship);
 
             if (relationshipEdge == null) {
-                relationshipEdge = createRelationEdge(end1Vertex, end2Vertex, 
relationship);
+                relationshipEdge = createRelationshipEdge(end1Vertex, 
end2Vertex, relationship);
 
                 AtlasRelationshipType relationType = 
typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
 
@@ -99,9 +98,6 @@ public class AtlasRelationshipStoreV1 implements 
AtlasRelationshipStore {
                     }
                 }
 
-                // create legacy edges if mentioned in relationDef
-                createLegacyEdges(relationType.getRelationshipDef(), 
end1Vertex, end2Vertex);
-
                 ret = mapEdgeToAtlasRelationship(relationshipEdge);
 
             } else {
@@ -181,49 +177,34 @@ public class AtlasRelationshipStoreV1 implements 
AtlasRelationshipStore {
         }
 
         String                relationshipName = relationship.getTypeName();
+        String                end1TypeName     = 
getTypeNameFromObjectId(relationship.getEnd1());
+        String                end2TypeName     = 
getTypeNameFromObjectId(relationship.getEnd2());
         AtlasRelationshipType relationshipType = 
typeRegistry.getRelationshipTypeByName(relationshipName);
 
         if (relationshipType == null) {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, 
"unknown relationship type'" + relationshipName + "'");
         }
 
-        AtlasObjectId end1 = relationship.getEnd1();
-
-        if (end1 == null) {
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
"end1 is null");
+        if (relationship.getEnd1() == null || relationship.getEnd2() == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
"end1/end2 is null");
         }
 
-        String end1TypeName = end1.getTypeName();
-
-        if (StringUtils.isBlank(end1TypeName)) {
-            end1TypeName = 
AtlasGraphUtilsV1.getTypeNameFromGuid(end1.getGuid());
-        }
+        if (!relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName) 
&&
+                
!relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end1TypeName)) {
 
-        if (!relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end1TypeName)) 
{
             throw new 
AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, 
relationshipName,
-                                         
relationshipType.getEnd1Type().getTypeName(), end1TypeName);
+                                         
relationshipType.getEnd2Type().getTypeName(), end1TypeName);
         }
 
-        AtlasObjectId end2 = relationship.getEnd2();
-
-        if (end2 == null) {
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
"end2 is null");
-        }
-
-        String end2TypeName = end2.getTypeName();
-
-        if (StringUtils.isBlank(end2TypeName)) {
-            end2TypeName = 
AtlasGraphUtilsV1.getTypeNameFromGuid(end2.getGuid());
-        }
+        if (!relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName) 
&&
+                
!relationshipType.getEnd1Type().isTypeOrSuperTypeOf(end2TypeName)) {
 
-        if (!relationshipType.getEnd2Type().isTypeOrSuperTypeOf(end2TypeName)) 
{
             throw new 
AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_END_TYPE, 
relationshipName,
-                                         
relationshipType.getEnd2Type().getTypeName(), end2TypeName);
+                                         
relationshipType.getEnd1Type().getTypeName(), end2TypeName);
         }
 
-        validateEnd(end1);
-
-        validateEnd(end2);
+        validateEnd(relationship.getEnd1());
+        validateEnd(relationship.getEnd2());
 
         validateAndNormalize(relationship);
     }
@@ -267,15 +248,16 @@ public class AtlasRelationshipStoreV1 implements 
AtlasRelationshipStore {
         type.getNormalizedValue(relationship);
     }
 
-    private AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex 
toVertex, String relationshipLabel) {
-        AtlasEdge ret = graphHelper.getEdgeForLabel(fromVertex, 
relationshipLabel);
+    private AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex 
toVertex, AtlasRelationship relationship) {
+        String    relationshipLabel = getRelationshipEdgeLabel(fromVertex, 
toVertex, relationship);
+        AtlasEdge ret               = graphHelper.getEdgeForLabel(fromVertex, 
relationshipLabel);
 
         if (ret != null) {
             AtlasVertex inVertex = ret.getInVertex();
 
             if (inVertex != null) {
                 if 
(!StringUtils.equals(AtlasGraphUtilsV1.getIdFromVertex(inVertex),
-                                       
AtlasGraphUtilsV1.getIdFromVertex(toVertex))) {
+                                        
AtlasGraphUtilsV1.getIdFromVertex(toVertex))) {
                     ret = null;
                 }
             }
@@ -284,7 +266,7 @@ public class AtlasRelationshipStoreV1 implements 
AtlasRelationshipStore {
         return ret;
     }
 
-    private int getRelationVersion(AtlasRelationship relationship) {
+    private int getRelationshipVersion(AtlasRelationship relationship) {
         Long ret = relationship != null ? relationship.getVersion() : null;
 
         return (ret != null) ? ret.intValue() : DEFAULT_RELATIONSHIP_VERSION;
@@ -305,36 +287,61 @@ public class AtlasRelationshipStoreV1 implements 
AtlasRelationshipStore {
         return ret;
     }
 
-    private void createLegacyEdges(AtlasRelationshipDef relationshipDef, 
AtlasVertex fromVertex, AtlasVertex toVertex) throws RepositoryException {
-        if (relationshipDef != null) {
-            AtlasRelationshipEndDef endDef1 = relationshipDef.getEndDef1();
-            AtlasRelationshipEndDef endDef2 = relationshipDef.getEndDef2();
+    private AtlasEdge createRelationshipEdge(AtlasVertex fromVertex, 
AtlasVertex toVertex, AtlasRelationship relationship)
+                                             throws RepositoryException {
 
-            if (endDef1 != null && endDef1.hasLegacyRelation()) {
-                graphHelper.getOrCreateEdge(fromVertex, toVertex, 
endDef1.getLegacyLabel());
-            }
+        String    relationshipLabel = getRelationshipEdgeLabel(fromVertex, 
toVertex, relationship);
+        AtlasEdge ret               = graphHelper.getOrCreateEdge(fromVertex, 
toVertex, relationshipLabel);
 
-            if (endDef2 != null && endDef2.hasLegacyRelation()) {
-                graphHelper.getOrCreateEdge(toVertex, fromVertex, 
endDef2.getLegacyLabel());
-            }
-        }
-    }
-
-    private AtlasEdge createRelationEdge(AtlasVertex fromVertex, AtlasVertex 
toVertex, AtlasRelationship relationship) throws RepositoryException {
-        AtlasEdge ret = graphHelper.getOrCreateEdge(fromVertex, toVertex, 
relationship.getRelationshipLabel());
-
-        // add additional properties to edge
+        // map additional properties to relationship edge
         if (ret != null) {
             final String guid = UUID.randomUUID().toString();
 
             AtlasGraphUtilsV1.setProperty(ret, 
Constants.ENTITY_TYPE_PROPERTY_KEY, relationship.getTypeName());
             AtlasGraphUtilsV1.setProperty(ret, Constants.GUID_PROPERTY_KEY, 
guid);
-            AtlasGraphUtilsV1.setProperty(ret, Constants.VERSION_PROPERTY_KEY, 
getRelationVersion(relationship));
+            AtlasGraphUtilsV1.setProperty(ret, Constants.VERSION_PROPERTY_KEY, 
getRelationshipVersion(relationship));
+        }
+
+        return ret;
+    }
+
+    private String getRelationshipEdgeLabel(AtlasVertex fromVertex, 
AtlasVertex toVertex, AtlasRelationship relationship) {
+
+        String                  ret                = 
relationship.getRelationshipLabel();
+        AtlasRelationshipType   relationshipType   = 
typeRegistry.getRelationshipTypeByName(relationship.getTypeName());
+        AtlasRelationshipEndDef endDef1            = 
relationshipType.getRelationshipDef().getEndDef1();
+        AtlasRelationshipEndDef endDef2            = 
relationshipType.getRelationshipDef().getEndDef2();
+        Set<String>             fromVertexTypes    = 
getTypeAndAllSuperTypes(AtlasGraphUtilsV1.getTypeName(fromVertex));
+        Set<String>             toVertexTypes      = 
getTypeAndAllSuperTypes(AtlasGraphUtilsV1.getTypeName(toVertex));
+
+        // validate entity type and all its supertypes contains 
relationshipDefs end type
+        // e.g. [ hive_process -> hive_table] -> [ Process -> DataSet ]
+        if (fromVertexTypes.contains(endDef1.getType()) && 
toVertexTypes.contains(endDef2.getType())) {
+            String         attributeName = endDef1.getName();
+            AtlasAttribute endAttribute  = 
relationshipType.getEnd1Type().getAttribute(attributeName);
+
+            if (endAttribute != null) {
+                ret = endAttribute.getRelationshipEdgeLabel();
+            }
+
+        } else if (fromVertexTypes.contains(endDef2.getType()) && 
toVertexTypes.contains(endDef1.getType())) {
+            String         attributeName = endDef2.getName();
+            AtlasAttribute endAttribute  = 
relationshipType.getEnd2Type().getAttribute(attributeName);
+
+            if (endAttribute != null) {
+                ret = endAttribute.getRelationshipEdgeLabel();
+            }
         }
 
         return ret;
     }
 
+    public Set<String> getTypeAndAllSuperTypes(String entityTypeName) {
+        AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(entityTypeName);
+
+        return (entityType != null) ? entityType.getTypeAndAllSuperTypes() : 
new HashSet<String>();
+    }
+
     private AtlasRelationship mapEdgeToAtlasRelationship(AtlasEdge edge) 
throws AtlasBaseException {
         AtlasRelationship ret = new AtlasRelationship();
 
@@ -368,6 +375,8 @@ public class AtlasRelationshipStoreV1 implements 
AtlasRelationshipStore {
         relationship.setEnd1(new 
AtlasObjectId(GraphHelper.getGuid(end1Vertex), 
GraphHelper.getTypeName(end1Vertex)));
         relationship.setEnd2(new 
AtlasObjectId(GraphHelper.getGuid(end2Vertex), 
GraphHelper.getTypeName(end2Vertex)));
 
+        relationship.setLabel(edge.getLabel());
+
         return relationship;
     }
 
@@ -388,4 +397,14 @@ public class AtlasRelationshipStoreV1 implements 
AtlasRelationshipStore {
             relationship.setAttribute(attribute.getName(), attrValue);
         }
     }
+
+    private String getTypeNameFromObjectId(AtlasObjectId objectId) {
+        String typeName = objectId.getTypeName();
+
+        if (StringUtils.isBlank(typeName)) {
+            typeName = 
AtlasGraphUtilsV1.getTypeNameFromGuid(objectId.getGuid());
+        }
+
+        return typeName;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fe110c3/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
index ebf6a20..d5c1e86 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java
@@ -27,6 +27,7 @@ import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelationship;
 import org.apache.atlas.model.instance.AtlasStruct;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
@@ -37,6 +38,7 @@ import org.apache.atlas.repository.graph.GraphHelper;
 import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
 import org.apache.atlas.type.AtlasArrayType;
 import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasEntityType;
@@ -67,17 +69,19 @@ import static 
org.apache.atlas.repository.graph.GraphHelper.string;
 public class EntityGraphMapper {
     private static final Logger LOG = 
LoggerFactory.getLogger(EntityGraphMapper.class);
 
-    private final GraphHelper       graphHelper = GraphHelper.getInstance();
-    private final AtlasGraph        graph;
-    private final DeleteHandlerV1   deleteHandler;
-    private final AtlasTypeRegistry typeRegistry;
-
+    private final GraphHelper            graphHelper = 
GraphHelper.getInstance();
+    private final AtlasGraph             graph;
+    private final DeleteHandlerV1        deleteHandler;
+    private final AtlasTypeRegistry      typeRegistry;
+    private final AtlasRelationshipStore relationshipStore;
 
     @Inject
-    public EntityGraphMapper(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry 
typeRegistry, AtlasGraph atlasGraph) {
-        this.deleteHandler = deleteHandler;
-        this.typeRegistry  = typeRegistry;
-        this.graph         = atlasGraph;
+    public EntityGraphMapper(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry 
typeRegistry, AtlasGraph atlasGraph,
+                             AtlasRelationshipStore relationshipStore) {
+        this.deleteHandler     = deleteHandler;
+        this.typeRegistry      = typeRegistry;
+        this.graph             = atlasGraph;
+        this.relationshipStore = relationshipStore;
     }
 
     public AtlasVertex createVertex(AtlasEntity entity) {
@@ -305,9 +309,15 @@ public class EntityGraphMapper {
             }
 
             case OBJECT_ID_TYPE: {
-                String    edgeLabel    = 
AtlasGraphUtilsV1.getEdgeLabel(ctx.getVertexProperty());
-                AtlasEdge currentEdge  = 
graphHelper.getEdgeForLabel(ctx.getReferringVertex(), edgeLabel);
-                AtlasEdge newEdge      = null;
+                String edgeLabel = 
ctx.getAttribute().getRelationshipEdgeLabel();
+
+                // legacy case - if relationship attribute doesn't exist, use 
legacy edge label.
+                if (StringUtils.isEmpty(edgeLabel)) {
+                    edgeLabel = 
AtlasGraphUtilsV1.getEdgeLabel(ctx.getVertexProperty());
+                }
+
+                AtlasEdge currentEdge = 
graphHelper.getEdgeForLabel(ctx.getReferringVertex(), edgeLabel);
+                AtlasEdge newEdge     = null;
 
                 if (ctx.getValue() != null) {
                     AtlasEntityType instanceType = 
getInstanceType(ctx.getValue());
@@ -316,12 +326,14 @@ public class EntityGraphMapper {
                     ctx.setElementType(instanceType);
                     ctx.setExistingEdge(edge);
 
-                    newEdge = mapObjectIdValue(ctx, context);
+                    newEdge = mapObjectIdValueUsingRelationship(ctx, context);
+
                     if (ctx.getAttribute().getInverseRefAttribute() != null) {
-                        // Update the inverse reference on the target entity
-                        addInverseReference(ctx, 
ctx.getAttribute().getInverseRefAttribute(), newEdge);
+                        // Update the inverse reference using relationship on 
the target entity
+                        
addInverseReference(ctx.getAttribute().getInverseRefAttribute(), newEdge);
                     }
                 }
+
                 if (currentEdge != null && !currentEdge.equals(newEdge)) {
                     deleteHandler.deleteEdgeReference(currentEdge, 
ctx.getAttrType().getTypeCategory(), ctx.getAttribute().isOwnedRef(), true);
                 }
@@ -340,21 +352,15 @@ public class EntityGraphMapper {
         }
     }
 
-    private void addInverseReference(AttributeMutationContext ctx, 
AtlasAttribute inverseAttribute, AtlasEdge edge) throws AtlasBaseException {
+    private void addInverseReference(AtlasAttribute inverseAttribute, 
AtlasEdge edge) throws AtlasBaseException {
+        AtlasStructType inverseType      = inverseAttribute.getDefinedInType();
+        AtlasVertex     inverseVertex    = edge.getInVertex();
+        String          inverseEdgeLabel = 
inverseAttribute.getRelationshipEdgeLabel();
+        AtlasEdge       inverseEdge      = 
graphHelper.getEdgeForLabel(inverseVertex, inverseEdgeLabel);
+        String          propertyName     = 
AtlasGraphUtilsV1.getQualifiedAttributePropertyKey(inverseType, 
inverseAttribute.getName());
 
-        AtlasStructType inverseType = inverseAttribute.getDefinedInType();
-        String propertyName = 
AtlasGraphUtilsV1.getQualifiedAttributePropertyKey(inverseType, 
inverseAttribute.getName());
-        AtlasVertex vertex = edge.getOutVertex();
-        AtlasVertex inverseVertex = edge.getInVertex();
-        String inverseEdgeLabel = AtlasGraphUtilsV1.getEdgeLabel(propertyName);
-        AtlasEdge inverseEdge = graphHelper.getEdgeForLabel(inverseVertex, 
inverseEdgeLabel);
-
-        AtlasEdge newEdge;
-        try {
-            newEdge = graphHelper.getOrCreateEdge(inverseVertex, vertex, 
inverseEdgeLabel);
-        } catch (RepositoryException e) {
-            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
-        }
+        // create new inverse reference
+        AtlasEdge newEdge = 
createInverseReferenceUsingRelationship(inverseAttribute, edge);
 
         boolean inverseUpdated = true;
         switch (inverseAttribute.getAttributeType().getTypeCategory()) {
@@ -363,7 +369,7 @@ public class EntityGraphMapper {
                 if (!inverseEdge.equals(newEdge)) {
                     // Disconnect old reference
                     deleteHandler.deleteEdgeReference(inverseEdge, 
inverseAttribute.getAttributeType().getTypeCategory(),
-                        inverseAttribute.isOwnedRef(), true);
+                                                      
inverseAttribute.isOwnedRef(), true);
                 }
                 else {
                     // Edge already exists for this attribute between these 
vertices.
@@ -401,6 +407,60 @@ public class EntityGraphMapper {
         }
     }
 
+    private AtlasEdge createInverseReferenceUsingRelationship(AtlasAttribute 
inverseAttribute, AtlasEdge edge) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> createInverseReferenceUsingRelationship()");
+        }
+
+        String      inverseAttributeName   = inverseAttribute.getName();
+        AtlasType   inverseAttributeType   = 
inverseAttribute.getDefinedInType();
+        AtlasVertex inverseVertex          = edge.getInVertex();
+        AtlasVertex vertex                 = edge.getOutVertex();
+        AtlasEdge   ret;
+
+        if (inverseAttributeType instanceof AtlasEntityType) {
+            AtlasEntityType entityType = (AtlasEntityType) 
inverseAttributeType;
+
+            if (entityType.hasRelationshipAttribute(inverseAttributeName)) {
+                ret = createRelationship(inverseVertex, vertex, 
inverseAttribute.getRelationshipEdgeLabel());
+
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("No RelationshipDef defined between {} and {} on 
attribute: {}", inverseAttributeType,
+                                                        
AtlasGraphUtilsV1.getTypeName(vertex), inverseAttributeName);
+                }
+                // if no RelationshipDef found, use legacy way to create edges
+                ret = createInverseReference(inverseAttribute, 
(AtlasStructType) inverseAttributeType, inverseVertex, vertex);
+            }
+        } else {
+            // inverseAttribute not of type AtlasEntityType, use legacy way to 
create edges
+            ret = createInverseReference(inverseAttribute, (AtlasStructType) 
inverseAttributeType, inverseVertex, vertex);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== createInverseReferenceUsingRelationship()");
+        }
+
+        return ret;
+    }
+
+    // legacy method to create edges for inverse reference
+    private AtlasEdge createInverseReference(AtlasAttribute inverseAttribute, 
AtlasStructType inverseAttributeType,
+                                             AtlasVertex inverseVertex, 
AtlasVertex vertex) throws AtlasBaseException {
+
+        String propertyName     = 
AtlasGraphUtilsV1.getQualifiedAttributePropertyKey(inverseAttributeType, 
inverseAttribute.getName());
+        String inverseEdgeLabel = AtlasGraphUtilsV1.getEdgeLabel(propertyName);
+        AtlasEdge ret;
+
+        try {
+            ret = graphHelper.getOrCreateEdge(inverseVertex, vertex, 
inverseEdgeLabel);
+
+        } catch (RepositoryException e) {
+            throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
+        }
+
+        return ret;
+    }
 
     private Object mapPrimitiveValue(AttributeMutationContext ctx) {
         AtlasGraphUtilsV1.setProperty(ctx.getReferringVertex(), 
ctx.getVertexProperty(), ctx.getValue());
@@ -492,6 +552,63 @@ public class EntityGraphMapper {
         return ret;
     }
 
+    private AtlasEdge 
mapObjectIdValueUsingRelationship(AttributeMutationContext ctx, 
EntityMutationContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> mapObjectIdValueUsingRelationship({})", ctx);
+        }
+
+        AtlasVertex attributeVertex = 
context.getDiscoveryContext().getResolvedEntityVertex(getGuid(ctx.getValue()));
+        AtlasVertex entityVertex    = ctx.getReferringVertex();
+        AtlasEdge   ret;
+
+        if (attributeVertex == null) {
+            AtlasObjectId objectId = getObjectId(ctx.getValue());
+
+            attributeVertex = (objectId != null) ? 
context.getDiscoveryContext().getResolvedEntityVertex(objectId) : null;
+        }
+
+        if (attributeVertex == null) {
+            throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, 
(ctx.getValue() == null ? null : ctx.getValue().toString()));
+        }
+
+        String    attributeName = ctx.getAttribute().getName();
+        AtlasType type          = 
typeRegistry.getType(AtlasGraphUtilsV1.getTypeName(entityVertex));
+
+        if (type instanceof AtlasEntityType) {
+            AtlasEntityType entityType = (AtlasEntityType) type;
+
+            // use relationship to create/update edges
+            if (entityType.hasRelationshipAttribute(attributeName)) {
+                if (ctx.getCurrentEdge() != null) {
+                    ret = updateRelationship(ctx.getCurrentEdge(), 
attributeVertex);
+
+                } else {
+                    String relationshipName = 
graphHelper.getRelationshipDefName(entityVertex, entityType, attributeName);
+                    ret = createRelationship(entityVertex, attributeVertex, 
relationshipName);
+                }
+
+            } else {
+                // use legacy way to create/update edges
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("No RelationshipDef defined between {} and {} on 
attribute: {}",  AtlasGraphUtilsV1.getTypeName(entityVertex),
+                               AtlasGraphUtilsV1.getTypeName(attributeVertex), 
attributeName);
+                }
+
+                ret = mapObjectIdValue(ctx, context);
+            }
+
+        } else {
+            // if type is StructType having objectid as attribute
+            ret = mapObjectIdValue(ctx, context);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== mapObjectIdValueUsingRelationship({})", ctx);
+        }
+
+        return ret;
+    }
+
     private Map<String, Object> mapMapValue(AttributeMutationContext ctx, 
EntityMutationContext context) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> mapMapValue({})", ctx);
@@ -536,7 +653,7 @@ public class EntityGraphMapper {
                     // update the inverse reference value.
                     if (isReference && newEntry instanceof AtlasEdge && 
inverseRefAttribute != null) {
                         AtlasEdge newEdge = (AtlasEdge) newEntry;
-                        addInverseReference(mapCtx, inverseRefAttribute, 
newEdge);
+                        addInverseReference(inverseRefAttribute, newEdge);
                     }
                 }
             }
@@ -568,14 +685,14 @@ public class EntityGraphMapper {
             LOG.debug("==> mapArrayValue({})", ctx);
         }
 
-        AtlasAttribute attribute       = ctx.getAttribute();
-        List           newElements     = (List) ctx.getValue();
-        AtlasArrayType arrType         = (AtlasArrayType) 
attribute.getAttributeType();
-        AtlasType      elementType     = arrType.getElementType();
-        List<Object>   currentElements = getArrayElementsProperty(elementType, 
ctx.getReferringVertex(), ctx.getVertexProperty());
-        boolean isReference = AtlasGraphUtilsV1.isReference(elementType);
+        AtlasAttribute attribute           = ctx.getAttribute();
+        List           newElements         = (List) ctx.getValue();
+        AtlasArrayType arrType             = (AtlasArrayType) 
attribute.getAttributeType();
+        AtlasType      elementType         = arrType.getElementType();
+        List<Object>   currentElements     = 
getArrayElementsProperty(elementType, ctx.getReferringVertex(), 
ctx.getVertexProperty());
+        boolean        isReference         = 
AtlasGraphUtilsV1.isReference(elementType);
         AtlasAttribute inverseRefAttribute = 
attribute.getInverseRefAttribute();
-        List<Object> newElementsCreated = new ArrayList<>();
+        List<Object>   newElementsCreated  = new ArrayList<>();
 
         if (CollectionUtils.isNotEmpty(newElements)) {
             for (int index = 0; index < newElements.size(); index++) {
@@ -587,8 +704,9 @@ public class EntityGraphMapper {
                 if (isReference && newEntry instanceof AtlasEdge && 
inverseRefAttribute != null) {
                     // Update the inverse reference value.
                     AtlasEdge newEdge = (AtlasEdge) newEntry;
-                    addInverseReference(arrCtx, inverseRefAttribute, newEdge);
+                    addInverseReference(inverseRefAttribute, newEdge);
                 }
+
                 newElementsCreated.add(newEntry);
             }
         }
@@ -659,7 +777,7 @@ public class EntityGraphMapper {
         case OBJECT_ID_TYPE:
             AtlasEntityType instanceType = getInstanceType(ctx.getValue());
             ctx.setElementType(instanceType);
-            return mapObjectIdValue(ctx, context);
+            return mapObjectIdValueUsingRelationship(ctx, context);
 
         case MAP:
         case ARRAY:
@@ -816,6 +934,29 @@ public class EntityGraphMapper {
         return newEdge;
     }
 
+    private AtlasEdge updateRelationship(AtlasEdge currentEdge, final 
AtlasVertex entityVertex) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Updating entity reference using relationship {} for 
reference attribute {}",  AtlasGraphUtilsV1.getTypeName(entityVertex));
+        }
+
+        String    currentEntityId = 
AtlasGraphUtilsV1.getIdFromVertex(currentEdge.getInVertex());
+        String    newEntityId     = 
AtlasGraphUtilsV1.getIdFromVertex(entityVertex);
+        AtlasEdge ret             = currentEdge;
+
+        if (!currentEntityId.equals(newEntityId) && entityVertex != null) {
+            // create a new relationship edge to the new attribute vertex from 
the instance
+            String relationshipName = 
AtlasGraphUtilsV1.getTypeName(currentEdge);
+
+            if (relationshipName == null) {
+                relationshipName = currentEdge.getLabel();
+            }
+
+            ret = createRelationship(currentEdge.getOutVertex(), entityVertex, 
relationshipName);
+        }
+
+        return ret;
+    }
+
     public static List<Object> getArrayElementsProperty(AtlasType elementType, 
AtlasVertex vertex, String vertexPropertyName) {
         if (AtlasGraphUtilsV1.isReference(elementType)) {
             return (List)vertex.getListProperty(vertexPropertyName, 
AtlasEdge.class);
@@ -1038,4 +1179,26 @@ public class EntityGraphMapper {
             }
         }
     }
+
+    private AtlasEdge createRelationship(AtlasVertex end1Vertex, AtlasVertex 
end2Vertex, String relationshipName) throws AtlasBaseException {
+        AtlasEdge         ret          = null;
+        AtlasObjectId     end1         = new 
AtlasObjectId(AtlasGraphUtilsV1.getIdFromVertex(end1Vertex), 
AtlasGraphUtilsV1.getTypeName(end1Vertex));
+        AtlasObjectId     end2         = new 
AtlasObjectId(AtlasGraphUtilsV1.getIdFromVertex(end2Vertex), 
AtlasGraphUtilsV1.getTypeName(end2Vertex));
+        AtlasRelationship relationship = relationshipStore.create(new 
AtlasRelationship(relationshipName, end1, end2));
+
+        // return newly created AtlasEdge
+        // if multiple edges are returned, compare using id to pick the right 
one
+        Iterator<AtlasEdge> outEdges = 
graphHelper.getOutGoingEdgesByLabel(end1Vertex, relationship.getLabel());
+
+        while (outEdges.hasNext()) {
+            AtlasEdge edge = outEdges.next();
+
+            if 
(AtlasGraphUtilsV1.getIdFromVertex(end2Vertex).equals(AtlasGraphUtilsV1.getIdFromVertex(edge.getInVertex())))
 {
+                ret = edge;
+                break;
+            }
+        }
+
+        return ret;
+    }
 }

Reply via email to