Repository: atlas
Updated Branches:
  refs/heads/master ec00aed1e -> b32e547e4


ATLAS-2492: updated Storm version to 1.2.0


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/5ebd7070
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/5ebd7070
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/5ebd7070

Branch: refs/heads/master
Commit: 5ebd7070ec767c0f27ae4240bfd1481190f38d0d
Parents: ec00aed
Author: Madhan Neethiraj <mad...@apache.org>
Authored: Fri Mar 9 15:29:55 2018 -0800
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Fri Mar 9 15:29:55 2018 -0800

----------------------------------------------------------------------
 addons/storm-bridge-shim/pom.xml                |  2 +-
 addons/storm-bridge/pom.xml                     |  2 +-
 .../apache/atlas/storm/hook/StormAtlasHook.java | 36 +++++++++++++++-----
 3 files changed, 29 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/5ebd7070/addons/storm-bridge-shim/pom.xml
----------------------------------------------------------------------
diff --git a/addons/storm-bridge-shim/pom.xml b/addons/storm-bridge-shim/pom.xml
index 77d9cc0..c25fbfe 100755
--- a/addons/storm-bridge-shim/pom.xml
+++ b/addons/storm-bridge-shim/pom.xml
@@ -31,7 +31,7 @@
     <packaging>jar</packaging>
 
     <properties>
-        <storm.version>1.0.0</storm.version>
+        <storm.version>1.2.0</storm.version>
     </properties>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/atlas/blob/5ebd7070/addons/storm-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml
index 2d41add..2627d92 100644
--- a/addons/storm-bridge/pom.xml
+++ b/addons/storm-bridge/pom.xml
@@ -30,7 +30,7 @@
     <packaging>jar</packaging>
 
     <properties>
-        <storm.version>1.0.0</storm.version>
+        <storm.version>1.2.0</storm.version>
         <hive.version>1.2.1</hive.version>
     </properties>
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/5ebd7070/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
----------------------------------------------------------------------
diff --git 
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
 
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index 98bb186..b0a4dab 100644
--- 
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++ 
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -186,31 +186,49 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
         Referenceable dataSetReferenceable;
         // todo: need to redo this with a config driven approach
         switch (name) {
-            case "KafkaSpout":
+            case "KafkaSpout": {
+                String topicName = 
config.get("KafkaSpout.kafkaSpoutConfig.translator.topic");
+                String uri       = 
config.get("KafkaSpout.kafkaSpoutConfig.kafkaProps.bootstrap.servers");
+
+                if (StringUtils.isEmpty(topicName)) {
+                    topicName = config.get("KafkaSpout._spoutConfig.topic");
+                }
+
+                if (StringUtils.isEmpty(uri)) {
+                    uri = 
config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr");
+                }
+
                 dataSetReferenceable = new 
Referenceable(StormDataTypes.KAFKA_TOPIC.getName());
-                final String topicName = 
config.get("KafkaSpout._spoutConfig.topic");
                 dataSetReferenceable.set("topic", topicName);
-                dataSetReferenceable.set("uri",
-                        
config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr"));
+                dataSetReferenceable.set("uri", uri);
+
                 if (StringUtils.isEmpty(topologyOwner)) {
                     topologyOwner = ANONYMOUS_OWNER;
                 }
                 dataSetReferenceable.set(AtlasClient.OWNER, topologyOwner);
                 
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getKafkaTopicQualifiedName(getClusterName(stormConf), topicName));
                 dataSetReferenceable.set(AtlasClient.NAME, topicName);
-                break;
+            }
+            break;
 
-            case "HBaseBolt":
+            case "HBaseBolt": {
                 dataSetReferenceable = new 
Referenceable(StormDataTypes.HBASE_TABLE.getName());
                 final String hbaseTableName = 
config.get("HBaseBolt.tableName");
-                dataSetReferenceable.set("uri", 
stormConf.get("hbase.rootdir"));
-                dataSetReferenceable.set(AtlasClient.NAME, hbaseTableName);
+                String       uri            = config.get("hbase.rootdir");
+
+                if (StringUtils.isEmpty(uri)) {
+                    uri = hbaseTableName;
+                }
+
+                dataSetReferenceable.set("uri", hbaseTableName);
+                dataSetReferenceable.set(AtlasClient.NAME, uri);
                 dataSetReferenceable.set(AtlasClient.OWNER, 
stormConf.get("storm.kerberos.principal"));
                 clusterName = 
extractComponentClusterName(HBaseConfiguration.create(), stormConf);
                 //TODO - Hbase Namespace is hardcoded to 'default'. need to 
check how to get this or is it already part of tableName
                 
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT,
                         hbaseTableName));
-                break;
+            }
+            break;
 
             case "HdfsBolt":
                 dataSetReferenceable = new 
Referenceable(HiveMetaStoreBridge.HDFS_PATH);

Reply via email to