This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 845a2a8b5561e964bb2c24df61aa6f553969b493 Author: Mandar Ambawane <[email protected]> AuthorDate: Thu Jul 18 19:15:11 2019 +0530 ATLAS-3335 Update Sqoop/Storm hook to use relationship attributes Signed-off-by: Sarath Subramanian <[email protected]> (cherry picked from commit 74bfe9478ed26d917d5da77e02309d8b7b4299f9) --- .../main/java/org/apache/atlas/sqoop/hook/SqoopHook.java | 14 +++++++++----- .../java/org/apache/atlas/storm/hook/StormAtlasHook.java | 12 ++++++++---- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java index 3ccd426..25ab7cb 100644 --- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java +++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java @@ -70,6 +70,10 @@ public class SqoopHook extends SqoopJobDataPublisher { public static final String OUTPUTS = "outputs"; public static final String ATTRIBUTE_DB = "db"; + public static final String RELATIONSHIP_HIVE_TABLE_DB = "hive_table_db"; + public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS = "dataset_process_inputs"; + public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS = "process_dataset_outputs"; + private static final AtlasHookImpl atlasHook; static { @@ -129,7 +133,7 @@ public class SqoopHook extends SqoopJobDataPublisher { entHiveTable.setAttribute(AtlasClient.NAME, tableName.toLowerCase()); entHiveTable.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName); - entHiveTable.setAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasObjectId(entHiveDb)); + entHiveTable.setRelationshipAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasRelatedObjectId(entHiveDb, RELATIONSHIP_HIVE_TABLE_DB)); return entHiveTable; } @@ -179,11 +183,11 @@ public class SqoopHook extends SqoopJobDataPublisher { List<AtlasObjectId> hiveObjects = Collections.singletonList(AtlasTypeUtil.getAtlasObjectId(entHiveTable != null ? entHiveTable : entHiveDb)); if (isImportOperation(data)) { - entProcess.setAttribute(SqoopHook.INPUTS, sqoopObjects); - entProcess.setAttribute(SqoopHook.OUTPUTS, hiveObjects); + entProcess.setRelationshipAttribute(SqoopHook.INPUTS, AtlasTypeUtil.getAtlasRelatedObjectIdList(sqoopObjects, RELATIONSHIP_DATASET_PROCESS_INPUTS)); + entProcess.setRelationshipAttribute(SqoopHook.OUTPUTS, AtlasTypeUtil.getAtlasRelatedObjectIdList(hiveObjects, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)); } else { - entProcess.setAttribute(SqoopHook.INPUTS, hiveObjects); - entProcess.setAttribute(SqoopHook.OUTPUTS, sqoopObjects); + entProcess.setRelationshipAttribute(SqoopHook.INPUTS, AtlasTypeUtil.getAtlasRelatedObjectIdList(hiveObjects, RELATIONSHIP_DATASET_PROCESS_INPUTS)); + entProcess.setRelationshipAttribute(SqoopHook.OUTPUTS, AtlasTypeUtil.getAtlasRelatedObjectIdList(sqoopObjects, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)); } entProcess.setAttribute(SqoopHook.USER, data.getUser()); diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java index 517a3c3..cdfdefc 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -67,6 +67,10 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { public static final String HBASE_NAMESPACE_DEFAULT = "default"; public static final String ATTRIBUTE_DB = "db"; + public static final String RELATIONSHIP_STORM_TOPOLOGY_NODES = "storm_topology_nodes"; + public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS = "dataset_process_inputs"; + public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS = "process_dataset_outputs"; + /** * This is the client-side hook that storm fires when a topology is added. * @@ -90,7 +94,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { if (CollectionUtils.isNotEmpty(graphNodes)) { // add the connection from topology to the graph - topology.setAttribute("nodes", AtlasTypeUtil.getAtlasObjectIds(graphNodes)); + topology.setRelationshipAttribute("nodes", AtlasTypeUtil.getAtlasRelatedObjectIds(graphNodes, RELATIONSHIP_STORM_TOPOLOGY_NODES)); for (AtlasEntity graphNode : graphNodes) { entity.addReferredEntity(graphNode); @@ -144,7 +148,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { } } - topology.setAttribute("inputs", AtlasTypeUtil.getAtlasObjectIds(inputs)); + topology.setRelationshipAttribute("inputs", AtlasTypeUtil.getAtlasRelatedObjectIds(inputs, RELATIONSHIP_DATASET_PROCESS_INPUTS)); } private void addTopologyOutputs(StormTopology stormTopology, String topologyOwner, Map stormConf, AtlasEntity topology, AtlasEntityExtInfo entityExtInfo) { @@ -162,7 +166,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { } } - topology.setAttribute("outputs", AtlasTypeUtil.getAtlasObjectIds(outputs)); + topology.setRelationshipAttribute("outputs", AtlasTypeUtil.getAtlasRelatedObjectIds(outputs, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)); } private AtlasEntity addDataSet(String dataSetType, String topologyOwner, Serializable instance, Map stormConf, AtlasEntityExtInfo entityExtInfo) { @@ -272,7 +276,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ret = new AtlasEntity("hive_table"); ret.setAttribute(AtlasClient.NAME, tblName); - ret.setAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasObjectId(dbEntity)); + ret.setRelationshipAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasRelatedObjectId(dbEntity, "hive_table_db")); ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(metadataNamespace, dbName, tblName)); } }
