Repository: atlas
Updated Branches:
  refs/heads/master 57c7e85ec -> fe1c7a3b4


ATLAS-2592 Storm atlas hook fails with NPE

Signed-off-by: Madhan Neethiraj <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/fe1c7a3b
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/fe1c7a3b
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/fe1c7a3b

Branch: refs/heads/master
Commit: fe1c7a3b48a751c20e51ee7d97c30da84af21b85
Parents: 57c7e85
Author: rdsolani <[email protected]>
Authored: Mon Apr 23 14:35:00 2018 +0530
Committer: Madhan Neethiraj <[email protected]>
Committed: Mon Apr 23 17:28:56 2018 -0700

----------------------------------------------------------------------
 .../apache/atlas/storm/hook/StormAtlasHook.java | 61 ++++++++++++--------
 1 file changed, 36 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/fe1c7a3b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
----------------------------------------------------------------------
diff --git 
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
 
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index 7f725a4..fdce5eb 100644
--- 
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++ 
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -199,13 +199,17 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
 
                 clusterName = getClusterName(stormConf);
 
-                ret = new AtlasEntity(StormDataTypes.KAFKA_TOPIC.getName());
+                if (topicName == null) {
+                    LOG.error("Kafka topic name not found");
+                } else {
+                    ret = new 
AtlasEntity(StormDataTypes.KAFKA_TOPIC.getName());
 
-                ret.setAttribute("topic", topicName);
-                ret.setAttribute("uri", uri);
-                ret.setAttribute(AtlasClient.OWNER, topologyOwner);
-                ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getKafkaTopicQualifiedName(clusterName, topicName));
-                ret.setAttribute(AtlasClient.NAME, topicName);
+                    ret.setAttribute("topic", topicName);
+                    ret.setAttribute("uri", uri);
+                    ret.setAttribute(AtlasClient.OWNER, topologyOwner);
+                    ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getKafkaTopicQualifiedName(clusterName, topicName));
+                    ret.setAttribute(AtlasClient.NAME, topicName);
+                }
             }
             break;
 
@@ -219,13 +223,17 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
 
                 clusterName = 
extractComponentClusterName(HBaseConfiguration.create(), stormConf);
 
-                ret = new AtlasEntity(StormDataTypes.HBASE_TABLE.getName());
+                if (hbaseTableName == null) {
+                    LOG.error("HBase table name not found");
+                } else {
+                    ret = new 
AtlasEntity(StormDataTypes.HBASE_TABLE.getName());
 
-                ret.setAttribute("uri", hbaseTableName);
-                ret.setAttribute(AtlasClient.NAME, uri);
-                ret.setAttribute(AtlasClient.OWNER, 
stormConf.get("storm.kerberos.principal"));
-                //TODO - Hbase Namespace is hardcoded to 'default'. need to 
check how to get this or is it already part of tableName
-                ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, 
hbaseTableName));
+                    ret.setAttribute("uri", hbaseTableName);
+                    ret.setAttribute(AtlasClient.NAME, uri);
+                    ret.setAttribute(AtlasClient.OWNER, 
stormConf.get("storm.kerberos.principal"));
+                    //TODO - Hbase Namespace is hardcoded to 'default'. need 
to check how to get this or is it already part of tableName
+                    ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, 
hbaseTableName));
+                }
             }
             break;
 
@@ -259,24 +267,27 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
             case "HiveBolt": {
                 clusterName = extractComponentClusterName(new HiveConf(), 
stormConf);
 
-                final String dbName           = 
config.get("HiveBolt.options.databaseName");
-                final String tblName          = 
config.get("HiveBolt.options.tableName");
-                final String tblQualifiedName = 
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tblName);
+                final String dbName  = 
config.get("HiveBolt.options.databaseName");
+                final String tblName = 
config.get("HiveBolt.options.tableName");
 
-                AtlasEntity dbEntity = new AtlasEntity("hive_db");
+                if (dbName == null || tblName ==null) {
+                    LOG.error("Hive database or table name not found");
+                } else {
+                    AtlasEntity dbEntity = new AtlasEntity("hive_db");
 
-                dbEntity.setAttribute(AtlasClient.NAME, dbName);
-                
dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), dbName));
-                dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, 
getClusterName(stormConf));
+                    dbEntity.setAttribute(AtlasClient.NAME, dbName);
+                    
dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), dbName));
+                    
dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, 
getClusterName(stormConf));
 
-                entityExtInfo.addReferredEntity(dbEntity);
+                    entityExtInfo.addReferredEntity(dbEntity);
 
-                // todo: verify if hive table has everything needed to 
retrieve existing table
-                ret = new AtlasEntity("hive_table");
+                    // todo: verify if hive table has everything needed to 
retrieve existing table
+                    ret = new AtlasEntity("hive_table");
 
-                ret.setAttribute(AtlasClient.NAME, tblName);
-                ret.setAttribute(ATTRIBUTE_DB, 
AtlasTypeUtil.getAtlasObjectId(dbEntity));
-                ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
tblQualifiedName);
+                    ret.setAttribute(AtlasClient.NAME, tblName);
+                    ret.setAttribute(ATTRIBUTE_DB, 
AtlasTypeUtil.getAtlasObjectId(dbEntity));
+                    ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tblName));
+                }
             }
             break;
 

Reply via email to