Repository: atlas Updated Branches: refs/heads/master ec00aed1e -> b32e547e4
ATLAS-2492: updated Storm version to 1.2.0 Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/5ebd7070 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/5ebd7070 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/5ebd7070 Branch: refs/heads/master Commit: 5ebd7070ec767c0f27ae4240bfd1481190f38d0d Parents: ec00aed Author: Madhan Neethiraj <mad...@apache.org> Authored: Fri Mar 9 15:29:55 2018 -0800 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Fri Mar 9 15:29:55 2018 -0800 ---------------------------------------------------------------------- addons/storm-bridge-shim/pom.xml | 2 +- addons/storm-bridge/pom.xml | 2 +- .../apache/atlas/storm/hook/StormAtlasHook.java | 36 +++++++++++++++----- 3 files changed, 29 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/5ebd7070/addons/storm-bridge-shim/pom.xml ---------------------------------------------------------------------- diff --git a/addons/storm-bridge-shim/pom.xml b/addons/storm-bridge-shim/pom.xml index 77d9cc0..c25fbfe 100755 --- a/addons/storm-bridge-shim/pom.xml +++ b/addons/storm-bridge-shim/pom.xml @@ -31,7 +31,7 @@ <packaging>jar</packaging> <properties> - <storm.version>1.0.0</storm.version> + <storm.version>1.2.0</storm.version> </properties> <dependencies> http://git-wip-us.apache.org/repos/asf/atlas/blob/5ebd7070/addons/storm-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml index 2d41add..2627d92 100644 --- a/addons/storm-bridge/pom.xml +++ b/addons/storm-bridge/pom.xml @@ -30,7 +30,7 @@ <packaging>jar</packaging> <properties> - <storm.version>1.0.0</storm.version> + <storm.version>1.2.0</storm.version> <hive.version>1.2.1</hive.version> </properties> http://git-wip-us.apache.org/repos/asf/atlas/blob/5ebd7070/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java index 98bb186..b0a4dab 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -186,31 +186,49 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { Referenceable dataSetReferenceable; // todo: need to redo this with a config driven approach switch (name) { - case "KafkaSpout": + case "KafkaSpout": { + String topicName = config.get("KafkaSpout.kafkaSpoutConfig.translator.topic"); + String uri = config.get("KafkaSpout.kafkaSpoutConfig.kafkaProps.bootstrap.servers"); + + if (StringUtils.isEmpty(topicName)) { + topicName = config.get("KafkaSpout._spoutConfig.topic"); + } + + if (StringUtils.isEmpty(uri)) { + uri = config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr"); + } + dataSetReferenceable = new Referenceable(StormDataTypes.KAFKA_TOPIC.getName()); - final String topicName = config.get("KafkaSpout._spoutConfig.topic"); dataSetReferenceable.set("topic", topicName); - dataSetReferenceable.set("uri", - config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr")); + dataSetReferenceable.set("uri", uri); + if (StringUtils.isEmpty(topologyOwner)) { topologyOwner = ANONYMOUS_OWNER; } dataSetReferenceable.set(AtlasClient.OWNER, topologyOwner); dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(getClusterName(stormConf), topicName)); dataSetReferenceable.set(AtlasClient.NAME, topicName); - break; + } + break; - case "HBaseBolt": + case "HBaseBolt": { dataSetReferenceable = new Referenceable(StormDataTypes.HBASE_TABLE.getName()); final String hbaseTableName = config.get("HBaseBolt.tableName"); - dataSetReferenceable.set("uri", stormConf.get("hbase.rootdir")); - dataSetReferenceable.set(AtlasClient.NAME, hbaseTableName); + String uri = config.get("hbase.rootdir"); + + if (StringUtils.isEmpty(uri)) { + uri = hbaseTableName; + } + + dataSetReferenceable.set("uri", hbaseTableName); + dataSetReferenceable.set(AtlasClient.NAME, uri); dataSetReferenceable.set(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal")); clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf); //TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, hbaseTableName)); - break; + } + break; case "HdfsBolt": dataSetReferenceable = new Referenceable(HiveMetaStoreBridge.HDFS_PATH);