This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 845a2a8b5561e964bb2c24df61aa6f553969b493
Author: Mandar Ambawane <[email protected]>
AuthorDate: Thu Jul 18 19:15:11 2019 +0530

    ATLAS-3335 Update Sqoop/Storm hook to use relationship attributes
    
    Signed-off-by: Sarath Subramanian <[email protected]>
    (cherry picked from commit 74bfe9478ed26d917d5da77e02309d8b7b4299f9)
---
 .../main/java/org/apache/atlas/sqoop/hook/SqoopHook.java   | 14 +++++++++-----
 .../java/org/apache/atlas/storm/hook/StormAtlasHook.java   | 12 ++++++++----
 2 files changed, 17 insertions(+), 9 deletions(-)

diff --git 
a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java 
b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index 3ccd426..25ab7cb 100644
--- 
a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ 
b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -70,6 +70,10 @@ public class SqoopHook extends SqoopJobDataPublisher {
     public static final String OUTPUTS        = "outputs";
     public static final String ATTRIBUTE_DB   = "db";
 
+    public static final String RELATIONSHIP_HIVE_TABLE_DB = "hive_table_db";
+    public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS = 
"dataset_process_inputs";
+    public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS = 
"process_dataset_outputs";
+
     private static final AtlasHookImpl atlasHook;
 
     static {
@@ -129,7 +133,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
 
         entHiveTable.setAttribute(AtlasClient.NAME, tableName.toLowerCase());
         entHiveTable.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
qualifiedName);
-        entHiveTable.setAttribute(ATTRIBUTE_DB, 
AtlasTypeUtil.getAtlasObjectId(entHiveDb));
+        entHiveTable.setRelationshipAttribute(ATTRIBUTE_DB, 
AtlasTypeUtil.getAtlasRelatedObjectId(entHiveDb, RELATIONSHIP_HIVE_TABLE_DB));
 
         return entHiveTable;
     }
@@ -179,11 +183,11 @@ public class SqoopHook extends SqoopJobDataPublisher {
         List<AtlasObjectId> hiveObjects  = 
Collections.singletonList(AtlasTypeUtil.getAtlasObjectId(entHiveTable != null ? 
entHiveTable : entHiveDb));
 
         if (isImportOperation(data)) {
-            entProcess.setAttribute(SqoopHook.INPUTS, sqoopObjects);
-            entProcess.setAttribute(SqoopHook.OUTPUTS, hiveObjects);
+            entProcess.setRelationshipAttribute(SqoopHook.INPUTS, 
AtlasTypeUtil.getAtlasRelatedObjectIdList(sqoopObjects, 
RELATIONSHIP_DATASET_PROCESS_INPUTS));
+            entProcess.setRelationshipAttribute(SqoopHook.OUTPUTS, 
AtlasTypeUtil.getAtlasRelatedObjectIdList(hiveObjects, 
RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
         } else {
-            entProcess.setAttribute(SqoopHook.INPUTS, hiveObjects);
-            entProcess.setAttribute(SqoopHook.OUTPUTS, sqoopObjects);
+            entProcess.setRelationshipAttribute(SqoopHook.INPUTS, 
AtlasTypeUtil.getAtlasRelatedObjectIdList(hiveObjects, 
RELATIONSHIP_DATASET_PROCESS_INPUTS));
+            entProcess.setRelationshipAttribute(SqoopHook.OUTPUTS, 
AtlasTypeUtil.getAtlasRelatedObjectIdList(sqoopObjects, 
RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
         }
 
         entProcess.setAttribute(SqoopHook.USER, data.getUser());
diff --git 
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
 
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index 517a3c3..cdfdefc 100644
--- 
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++ 
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -67,6 +67,10 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
     public  static final String HBASE_NAMESPACE_DEFAULT = "default";
     public  static final String ATTRIBUTE_DB            = "db";
 
+    public static final String RELATIONSHIP_STORM_TOPOLOGY_NODES = 
"storm_topology_nodes";
+    public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS = 
"dataset_process_inputs";
+    public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS = 
"process_dataset_outputs";
+
     /**
      * This is the client-side hook that storm fires when a topology is added.
      *
@@ -90,7 +94,7 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
 
             if (CollectionUtils.isNotEmpty(graphNodes)) {
                 // add the connection from topology to the graph
-                topology.setAttribute("nodes", 
AtlasTypeUtil.getAtlasObjectIds(graphNodes));
+                topology.setRelationshipAttribute("nodes", 
AtlasTypeUtil.getAtlasRelatedObjectIds(graphNodes, 
RELATIONSHIP_STORM_TOPOLOGY_NODES));
 
                 for (AtlasEntity graphNode : graphNodes) {
                     entity.addReferredEntity(graphNode);
@@ -144,7 +148,7 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
             }
         }
 
-        topology.setAttribute("inputs", 
AtlasTypeUtil.getAtlasObjectIds(inputs));
+        topology.setRelationshipAttribute("inputs", 
AtlasTypeUtil.getAtlasRelatedObjectIds(inputs, 
RELATIONSHIP_DATASET_PROCESS_INPUTS));
     }
 
     private void addTopologyOutputs(StormTopology stormTopology, String 
topologyOwner, Map stormConf, AtlasEntity topology, AtlasEntityExtInfo 
entityExtInfo) {
@@ -162,7 +166,7 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
             }
         }
 
-        topology.setAttribute("outputs", 
AtlasTypeUtil.getAtlasObjectIds(outputs));
+        topology.setRelationshipAttribute("outputs", 
AtlasTypeUtil.getAtlasRelatedObjectIds(outputs, 
RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
     }
 
     private AtlasEntity addDataSet(String dataSetType, String topologyOwner, 
Serializable instance, Map stormConf, AtlasEntityExtInfo entityExtInfo) {
@@ -272,7 +276,7 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
                     ret = new AtlasEntity("hive_table");
 
                     ret.setAttribute(AtlasClient.NAME, tblName);
-                    ret.setAttribute(ATTRIBUTE_DB, 
AtlasTypeUtil.getAtlasObjectId(dbEntity));
+                    ret.setRelationshipAttribute(ATTRIBUTE_DB, 
AtlasTypeUtil.getAtlasRelatedObjectId(dbEntity, "hive_table_db"));
                     ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
HiveMetaStoreBridge.getTableQualifiedName(metadataNamespace, dbName, tblName));
                 }
             }

Reply via email to