This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 74bfe94 ATLAS-3335 Update Sqoop/Storm hook to use relationship
attributes
74bfe94 is described below
commit 74bfe9478ed26d917d5da77e02309d8b7b4299f9
Author: Mandar Ambawane <[email protected]>
AuthorDate: Thu Jul 18 19:15:11 2019 +0530
ATLAS-3335 Update Sqoop/Storm hook to use relationship attributes
Signed-off-by: Sarath Subramanian <[email protected]>
---
.../main/java/org/apache/atlas/sqoop/hook/SqoopHook.java | 14 +++++++++-----
.../java/org/apache/atlas/storm/hook/StormAtlasHook.java | 12 ++++++++----
2 files changed, 17 insertions(+), 9 deletions(-)
diff --git
a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index 3ccd426..25ab7cb 100644
---
a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++
b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -70,6 +70,10 @@ public class SqoopHook extends SqoopJobDataPublisher {
public static final String OUTPUTS = "outputs";
public static final String ATTRIBUTE_DB = "db";
+ public static final String RELATIONSHIP_HIVE_TABLE_DB = "hive_table_db";
+ public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS =
"dataset_process_inputs";
+ public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS =
"process_dataset_outputs";
+
private static final AtlasHookImpl atlasHook;
static {
@@ -129,7 +133,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
entHiveTable.setAttribute(AtlasClient.NAME, tableName.toLowerCase());
entHiveTable.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
qualifiedName);
- entHiveTable.setAttribute(ATTRIBUTE_DB,
AtlasTypeUtil.getAtlasObjectId(entHiveDb));
+ entHiveTable.setRelationshipAttribute(ATTRIBUTE_DB,
AtlasTypeUtil.getAtlasRelatedObjectId(entHiveDb, RELATIONSHIP_HIVE_TABLE_DB));
return entHiveTable;
}
@@ -179,11 +183,11 @@ public class SqoopHook extends SqoopJobDataPublisher {
List<AtlasObjectId> hiveObjects =
Collections.singletonList(AtlasTypeUtil.getAtlasObjectId(entHiveTable != null ?
entHiveTable : entHiveDb));
if (isImportOperation(data)) {
- entProcess.setAttribute(SqoopHook.INPUTS, sqoopObjects);
- entProcess.setAttribute(SqoopHook.OUTPUTS, hiveObjects);
+ entProcess.setRelationshipAttribute(SqoopHook.INPUTS,
AtlasTypeUtil.getAtlasRelatedObjectIdList(sqoopObjects,
RELATIONSHIP_DATASET_PROCESS_INPUTS));
+ entProcess.setRelationshipAttribute(SqoopHook.OUTPUTS,
AtlasTypeUtil.getAtlasRelatedObjectIdList(hiveObjects,
RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
} else {
- entProcess.setAttribute(SqoopHook.INPUTS, hiveObjects);
- entProcess.setAttribute(SqoopHook.OUTPUTS, sqoopObjects);
+ entProcess.setRelationshipAttribute(SqoopHook.INPUTS,
AtlasTypeUtil.getAtlasRelatedObjectIdList(hiveObjects,
RELATIONSHIP_DATASET_PROCESS_INPUTS));
+ entProcess.setRelationshipAttribute(SqoopHook.OUTPUTS,
AtlasTypeUtil.getAtlasRelatedObjectIdList(sqoopObjects,
RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
}
entProcess.setAttribute(SqoopHook.USER, data.getUser());
diff --git
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index 517a3c3..cdfdefc 100644
---
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -67,6 +67,10 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
public static final String HBASE_NAMESPACE_DEFAULT = "default";
public static final String ATTRIBUTE_DB = "db";
+ public static final String RELATIONSHIP_STORM_TOPOLOGY_NODES =
"storm_topology_nodes";
+ public static final String RELATIONSHIP_DATASET_PROCESS_INPUTS =
"dataset_process_inputs";
+ public static final String RELATIONSHIP_PROCESS_DATASET_OUTPUTS =
"process_dataset_outputs";
+
/**
* This is the client-side hook that storm fires when a topology is added.
*
@@ -90,7 +94,7 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
if (CollectionUtils.isNotEmpty(graphNodes)) {
// add the connection from topology to the graph
- topology.setAttribute("nodes",
AtlasTypeUtil.getAtlasObjectIds(graphNodes));
+ topology.setRelationshipAttribute("nodes",
AtlasTypeUtil.getAtlasRelatedObjectIds(graphNodes,
RELATIONSHIP_STORM_TOPOLOGY_NODES));
for (AtlasEntity graphNode : graphNodes) {
entity.addReferredEntity(graphNode);
@@ -144,7 +148,7 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
}
}
- topology.setAttribute("inputs",
AtlasTypeUtil.getAtlasObjectIds(inputs));
+ topology.setRelationshipAttribute("inputs",
AtlasTypeUtil.getAtlasRelatedObjectIds(inputs,
RELATIONSHIP_DATASET_PROCESS_INPUTS));
}
private void addTopologyOutputs(StormTopology stormTopology, String
topologyOwner, Map stormConf, AtlasEntity topology, AtlasEntityExtInfo
entityExtInfo) {
@@ -162,7 +166,7 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
}
}
- topology.setAttribute("outputs",
AtlasTypeUtil.getAtlasObjectIds(outputs));
+ topology.setRelationshipAttribute("outputs",
AtlasTypeUtil.getAtlasRelatedObjectIds(outputs,
RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
}
private AtlasEntity addDataSet(String dataSetType, String topologyOwner,
Serializable instance, Map stormConf, AtlasEntityExtInfo entityExtInfo) {
@@ -272,7 +276,7 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
ret = new AtlasEntity("hive_table");
ret.setAttribute(AtlasClient.NAME, tblName);
- ret.setAttribute(ATTRIBUTE_DB,
AtlasTypeUtil.getAtlasObjectId(dbEntity));
+ ret.setRelationshipAttribute(ATTRIBUTE_DB,
AtlasTypeUtil.getAtlasRelatedObjectId(dbEntity, "hive_table_db"));
ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getTableQualifiedName(metadataNamespace, dbName, tblName));
}
}