Repository: atlas Updated Branches: refs/heads/master 57c7e85ec -> fe1c7a3b4
ATLAS-2592 Storm atlas hook fails with NPE Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/fe1c7a3b Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/fe1c7a3b Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/fe1c7a3b Branch: refs/heads/master Commit: fe1c7a3b48a751c20e51ee7d97c30da84af21b85 Parents: 57c7e85 Author: rdsolani <[email protected]> Authored: Mon Apr 23 14:35:00 2018 +0530 Committer: Madhan Neethiraj <[email protected]> Committed: Mon Apr 23 17:28:56 2018 -0700 ---------------------------------------------------------------------- .../apache/atlas/storm/hook/StormAtlasHook.java | 61 ++++++++++++-------- 1 file changed, 36 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/fe1c7a3b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java index 7f725a4..fdce5eb 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -199,13 +199,17 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { clusterName = getClusterName(stormConf); - ret = new AtlasEntity(StormDataTypes.KAFKA_TOPIC.getName()); + if (topicName == null) { + LOG.error("Kafka topic name not found"); + } else { + ret = new AtlasEntity(StormDataTypes.KAFKA_TOPIC.getName()); - ret.setAttribute("topic", topicName); - ret.setAttribute("uri", uri); - ret.setAttribute(AtlasClient.OWNER, topologyOwner); - ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(clusterName, topicName)); - ret.setAttribute(AtlasClient.NAME, topicName); + ret.setAttribute("topic", topicName); + ret.setAttribute("uri", uri); + ret.setAttribute(AtlasClient.OWNER, topologyOwner); + ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(clusterName, topicName)); + ret.setAttribute(AtlasClient.NAME, topicName); + } } break; @@ -219,13 +223,17 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf); - ret = new AtlasEntity(StormDataTypes.HBASE_TABLE.getName()); + if (hbaseTableName == null) { + LOG.error("HBase table name not found"); + } else { + ret = new AtlasEntity(StormDataTypes.HBASE_TABLE.getName()); - ret.setAttribute("uri", hbaseTableName); - ret.setAttribute(AtlasClient.NAME, uri); - ret.setAttribute(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal")); - //TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName - ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, hbaseTableName)); + ret.setAttribute("uri", hbaseTableName); + ret.setAttribute(AtlasClient.NAME, uri); + ret.setAttribute(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal")); + //TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName + ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, hbaseTableName)); + } } break; @@ -259,24 +267,27 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { case "HiveBolt": { clusterName = extractComponentClusterName(new HiveConf(), stormConf); - final String dbName = config.get("HiveBolt.options.databaseName"); - final String tblName = config.get("HiveBolt.options.tableName"); - final String tblQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tblName); + final String dbName = config.get("HiveBolt.options.databaseName"); + final String tblName = config.get("HiveBolt.options.tableName"); - AtlasEntity dbEntity = new AtlasEntity("hive_db"); + if (dbName == null || tblName ==null) { + LOG.error("Hive database or table name not found"); + } else { + AtlasEntity dbEntity = new AtlasEntity("hive_db"); - dbEntity.setAttribute(AtlasClient.NAME, dbName); - dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), dbName)); - dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); + dbEntity.setAttribute(AtlasClient.NAME, dbName); + dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), dbName)); + dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); - entityExtInfo.addReferredEntity(dbEntity); + entityExtInfo.addReferredEntity(dbEntity); - // todo: verify if hive table has everything needed to retrieve existing table - ret = new AtlasEntity("hive_table"); + // todo: verify if hive table has everything needed to retrieve existing table + ret = new AtlasEntity("hive_table"); - ret.setAttribute(AtlasClient.NAME, tblName); - ret.setAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasObjectId(dbEntity)); - ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tblQualifiedName); + ret.setAttribute(AtlasClient.NAME, tblName); + ret.setAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasObjectId(dbEntity)); + ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tblName)); + } } break;
