Repository: atlas Updated Branches: refs/heads/master 75415862c -> 54c31d5c8
ATLAS-2545: updated Storm hook to use V2 notifications Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/54c31d5c Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/54c31d5c Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/54c31d5c Branch: refs/heads/master Commit: 54c31d5c8e601757e19e26d3c30f2414532e2f8f Parents: 7541586 Author: rdsolani <[email protected]> Authored: Mon Apr 9 16:57:36 2018 +0530 Committer: Madhan Neethiraj <[email protected]> Committed: Tue Apr 10 13:48:27 2018 -0700 ---------------------------------------------------------------------- addons/storm-bridge/pom.xml | 44 +++ .../apache/atlas/storm/hook/StormAtlasHook.java | 394 +++++++++---------- pom.xml | 6 + 3 files changed, 247 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/54c31d5c/addons/storm-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml index 3446dcb..484902c 100644 --- a/addons/storm-bridge/pom.xml +++ b/addons/storm-bridge/pom.xml @@ -254,6 +254,11 @@ <artifactId>commons-configuration</artifactId> <version>${commons-conf.version}</version> </artifactItem> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-configuration2</artifactId> + <version>${commons-conf2.version}</version> + </dependency> <artifactItem> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> @@ -295,6 +300,40 @@ <version>${hadoop.version}</version> </artifactItem> <artifactItem> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <version>${hadoop.version}</version> + </artifactItem> + + <artifactItem> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </artifactItem> + + <artifactItem> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson.version}</version> + </artifactItem> + + <artifactItem> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson.version}</version> + </artifactItem> + <dependency> + <groupId>org.codehaus.woodstox</groupId> + <artifactId>stax2-api</artifactId> + <version>${codehaus.woodstox.stax2-api.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs-client</artifactId> + <version>${hadoop.hdfs-client.version}</version> + </dependency> + + <artifactItem> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> @@ -309,6 +348,11 @@ <artifactId>jsr311-api</artifactId> <version>${jsr.version}</version> </artifactItem> + <artifactItem> + <groupId>com.fasterxml.woodstox</groupId> + <artifactId>woodstox-core</artifactId> + <version>${woodstox-core.version}</version> + </artifactItem> </artifactItems> </configuration> </execution> http://git-wip-us.apache.org/repos/asf/atlas/blob/54c31d5c/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java index 110ec52..7f725a4 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -18,8 +18,14 @@ package org.apache.atlas.storm.hook; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo; +import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2; +import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.utils.HdfsNameServiceResolver; -import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.commons.collections.CollectionUtils; import org.apache.storm.ISubmitterHook; import org.apache.storm.generated.Bolt; import org.apache.storm.generated.SpoutSpec; @@ -40,6 +46,7 @@ import org.slf4j.Logger; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,17 +61,13 @@ import java.util.Date; * for the various lifecycle stages. */ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { - public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(StormAtlasHook.class); - private static final String CONF_PREFIX = "atlas.hook.storm."; - private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; - // will be used for owner if Storm topology does not contain the owner instance - // possible if Storm is running in unsecure mode. - public static final String ANONYMOUS_OWNER = "anonymous"; - - public static final String HBASE_NAMESPACE_DEFAULT = "default"; - public static final String ATTRIBUTE_DB = "db"; + private static final String CONF_PREFIX = "atlas.hook.storm."; + private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; + public static final String ANONYMOUS_OWNER = "anonymous"; // if Storm topology does not contain the owner instance; possible if Storm is running in unsecure mode. + public static final String HBASE_NAMESPACE_DEFAULT = "default"; + public static final String ATTRIBUTE_DB = "db"; private final HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance(); @@ -81,112 +84,103 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { * @param stormTopology a storm topology */ @Override - public void notify(TopologyInfo topologyInfo, Map stormConf, - StormTopology stormTopology) { - + public void notify(TopologyInfo topologyInfo, Map stormConf, StormTopology stormTopology) { LOG.info("Collecting metadata for a new storm topology: {}", topologyInfo.get_name()); + try { - ArrayList<Referenceable> entities = new ArrayList<>(); - Referenceable topologyReferenceable = createTopologyInstance(topologyInfo, stormConf); - List<Referenceable> dependentEntities = addTopologyDataSets(stormTopology, topologyReferenceable, - topologyInfo.get_owner(), stormConf); - if (dependentEntities.size()>0) { - entities.addAll(dependentEntities); - } + String user = getUser(topologyInfo.get_owner(), null); + AtlasEntity topology = createTopologyInstance(topologyInfo, stormConf); + AtlasEntitiesWithExtInfo entity = new AtlasEntitiesWithExtInfo(topology); + + addTopologyDataSets(stormTopology, topologyInfo.get_owner(), stormConf, topology, entity); + // create the graph for the topology - ArrayList<Referenceable> graphNodes = createTopologyGraph( - stormTopology, stormTopology.get_spouts(), stormTopology.get_bolts()); - // add the connection from topology to the graph - topologyReferenceable.set("nodes", graphNodes); - entities.add(topologyReferenceable); - - LOG.debug("notifying entities, size = {}", entities.size()); - String user = getUser(topologyInfo.get_owner(), null); - notifyEntities(user, entities); + List<AtlasEntity> graphNodes = createTopologyGraph(stormTopology, stormTopology.get_spouts(), stormTopology.get_bolts()); + + if (CollectionUtils.isNotEmpty(graphNodes)) { + // add the connection from topology to the graph + topology.setAttribute("nodes", AtlasTypeUtil.getAtlasObjectIds(graphNodes)); + + for (AtlasEntity graphNode : graphNodes) { + entity.addReferredEntity(graphNode); + } + } + + List<HookNotification> hookNotifications = Collections.singletonList(new EntityCreateRequestV2(user, entity)); + + notifyEntities(hookNotifications); } catch (Exception e) { throw new RuntimeException("Atlas hook is unable to process the topology.", e); } } - private Referenceable createTopologyInstance(TopologyInfo topologyInfo, Map stormConf) { - Referenceable topologyReferenceable = new Referenceable( - StormDataTypes.STORM_TOPOLOGY.getName()); - topologyReferenceable.set("id", topologyInfo.get_id()); - topologyReferenceable.set(AtlasClient.NAME, topologyInfo.get_name()); - topologyReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, topologyInfo.get_name()); - String owner = topologyInfo.get_owner(); + private AtlasEntity createTopologyInstance(TopologyInfo topologyInfo, Map stormConf) { + AtlasEntity topology = new AtlasEntity(StormDataTypes.STORM_TOPOLOGY.getName()); + String owner = topologyInfo.get_owner(); + if (StringUtils.isEmpty(owner)) { owner = ANONYMOUS_OWNER; } - topologyReferenceable.set(AtlasClient.OWNER, owner); - topologyReferenceable.set("startTime", new Date(System.currentTimeMillis())); - topologyReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); - return topologyReferenceable; + topology.setAttribute("id", topologyInfo.get_id()); + topology.setAttribute(AtlasClient.NAME, topologyInfo.get_name()); + topology.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, topologyInfo.get_name()); + topology.setAttribute(AtlasClient.OWNER, owner); + topology.setAttribute("startTime", new Date(System.currentTimeMillis())); + topology.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); + + return topology; } - private List<Referenceable> addTopologyDataSets(StormTopology stormTopology, - Referenceable topologyReferenceable, - String topologyOwner, - Map stormConf) { - List<Referenceable> dependentEntities = new ArrayList<>(); + private void addTopologyDataSets(StormTopology stormTopology, String topologyOwner, Map stormConf, AtlasEntity topology, AtlasEntityExtInfo entityExtInfo) { // add each spout as an input data set - addTopologyInputs(topologyReferenceable, - stormTopology.get_spouts(), stormConf, topologyOwner, dependentEntities); + addTopologyInputs(stormTopology.get_spouts(), stormConf, topologyOwner, topology, entityExtInfo); + // add the appropriate bolts as output data sets - addTopologyOutputs(topologyReferenceable, stormTopology, topologyOwner, stormConf, dependentEntities); - return dependentEntities; + addTopologyOutputs(stormTopology, topologyOwner, stormConf, topology, entityExtInfo); } - private void addTopologyInputs(Referenceable topologyReferenceable, - Map<String, SpoutSpec> spouts, - Map stormConf, - String topologyOwner, List<Referenceable> dependentEntities) { - final ArrayList<Referenceable> inputDataSets = new ArrayList<>(); + private void addTopologyInputs(Map<String, SpoutSpec> spouts, Map stormConf, String topologyOwner, AtlasEntity topology, AtlasEntityExtInfo entityExtInfo) { + List<AtlasEntity> inputs = new ArrayList<>(); + for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) { - Serializable instance = Utils.javaDeserialize( - entry.getValue().get_spout_object().get_serialized_java(), Serializable.class); + Serializable instance = Utils.javaDeserialize(entry.getValue().get_spout_object().get_serialized_java(), Serializable.class); + String dsType = instance.getClass().getSimpleName(); + AtlasEntity dsEntity = addDataSet(dsType, topologyOwner, instance, stormConf, entityExtInfo); - String simpleName = instance.getClass().getSimpleName(); - final Referenceable datasetRef = createDataSet(simpleName, topologyOwner, instance, stormConf, dependentEntities); - if (datasetRef != null) { - inputDataSets.add(datasetRef); + if (dsEntity != null) { + inputs.add(dsEntity); } } - topologyReferenceable.set("inputs", inputDataSets); + topology.setAttribute("inputs", AtlasTypeUtil.getAtlasObjectIds(inputs)); } - private void addTopologyOutputs(Referenceable topologyReferenceable, - StormTopology stormTopology, String topologyOwner, - Map stormConf, List<Referenceable> dependentEntities) { - final ArrayList<Referenceable> outputDataSets = new ArrayList<>(); - - Map<String, Bolt> bolts = stormTopology.get_bolts(); - Set<String> terminalBoltNames = StormTopologyUtil.getTerminalUserBoltNames(stormTopology); - for (String terminalBoltName : terminalBoltNames) { - Serializable instance = Utils.javaDeserialize(bolts.get(terminalBoltName) - .get_bolt_object().get_serialized_java(), Serializable.class); - - String dataSetType = instance.getClass().getSimpleName(); - final Referenceable datasetRef = createDataSet(dataSetType, topologyOwner, instance, stormConf, dependentEntities); - if (datasetRef != null) { - outputDataSets.add(datasetRef); + private void addTopologyOutputs(StormTopology stormTopology, String topologyOwner, Map stormConf, AtlasEntity topology, AtlasEntityExtInfo entityExtInfo) { + List<AtlasEntity> outputs = new ArrayList<>(); + Map<String, Bolt> bolts = stormTopology.get_bolts(); + Set<String> boltNames = StormTopologyUtil.getTerminalUserBoltNames(stormTopology); + + for (String boltName : boltNames) { + Serializable instance = Utils.javaDeserialize(bolts.get(boltName).get_bolt_object().get_serialized_java(), Serializable.class); + String dsType = instance.getClass().getSimpleName(); + AtlasEntity dsEntity = addDataSet(dsType, topologyOwner, instance, stormConf, entityExtInfo); + + if (dsEntity != null) { + outputs.add(dsEntity); } } - topologyReferenceable.set("outputs", outputDataSets); + topology.setAttribute("outputs", AtlasTypeUtil.getAtlasObjectIds(outputs)); } - private Referenceable createDataSet(String name, String topologyOwner, - Serializable instance, - Map stormConf, List<Referenceable> dependentEntities) { - Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true, null); + private AtlasEntity addDataSet(String dataSetType, String topologyOwner, Serializable instance, Map stormConf, AtlasEntityExtInfo entityExtInfo) { + Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true, null); + String clusterName = null; + AtlasEntity ret = null; - String clusterName = null; - Referenceable dataSetReferenceable; // todo: need to redo this with a config driven approach - switch (name) { + switch (dataSetType) { case "KafkaSpout": { String topicName = config.get("KafkaSpout.kafkaSpoutConfig.translator.topic"); String uri = config.get("KafkaSpout.kafkaSpoutConfig.kafkaProps.bootstrap.servers"); @@ -199,21 +193,23 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { uri = config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr"); } - dataSetReferenceable = new Referenceable(StormDataTypes.KAFKA_TOPIC.getName()); - dataSetReferenceable.set("topic", topicName); - dataSetReferenceable.set("uri", uri); - if (StringUtils.isEmpty(topologyOwner)) { topologyOwner = ANONYMOUS_OWNER; } - dataSetReferenceable.set(AtlasClient.OWNER, topologyOwner); - dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(getClusterName(stormConf), topicName)); - dataSetReferenceable.set(AtlasClient.NAME, topicName); + + clusterName = getClusterName(stormConf); + + ret = new AtlasEntity(StormDataTypes.KAFKA_TOPIC.getName()); + + ret.setAttribute("topic", topicName); + ret.setAttribute("uri", uri); + ret.setAttribute(AtlasClient.OWNER, topologyOwner); + ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(clusterName, topicName)); + ret.setAttribute(AtlasClient.NAME, topicName); } break; case "HBaseBolt": { - dataSetReferenceable = new Referenceable(StormDataTypes.HBASE_TABLE.getName()); final String hbaseTableName = config.get("HBaseBolt.tableName"); String uri = config.get("hbase.rootdir"); @@ -221,191 +217,195 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { uri = hbaseTableName; } - dataSetReferenceable.set("uri", hbaseTableName); - dataSetReferenceable.set(AtlasClient.NAME, uri); - dataSetReferenceable.set(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal")); clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf); + + ret = new AtlasEntity(StormDataTypes.HBASE_TABLE.getName()); + + ret.setAttribute("uri", hbaseTableName); + ret.setAttribute(AtlasClient.NAME, uri); + ret.setAttribute(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal")); //TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName - dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, - hbaseTableName)); + ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, hbaseTableName)); } break; - case "HdfsBolt": - dataSetReferenceable = new Referenceable(HiveMetaStoreBridge.HDFS_PATH); - String hdfsUri = config.get("HdfsBolt.rotationActions") == null - ? config.get("HdfsBolt.fileNameFormat.path") - : config.get("HdfsBolt.rotationActions"); - + case "HdfsBolt": { + final String hdfsUri = config.get("HdfsBolt.rotationActions") == null ? config.get("HdfsBolt.fileNameFormat.path") : config.get("HdfsBolt.rotationActions"); final String hdfsPathStr = config.get("HdfsBolt.fsUrl") + hdfsUri; + final Path hdfsPath = new Path(hdfsPathStr); final String nameServiceID = hdfsNameServiceResolver.getNameServiceIDForPath(hdfsPathStr); clusterName = getClusterName(stormConf); - dataSetReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); + ret = new AtlasEntity(HiveMetaStoreBridge.HDFS_PATH); + + ret.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); + ret.setAttribute(AtlasClient.OWNER, stormConf.get("hdfs.kerberos.principal")); + ret.setAttribute(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(hdfsPath).toString().toLowerCase()); + if (StringUtils.isNotEmpty(nameServiceID)) { String updatedPath = hdfsNameServiceResolver.getPathWithNameServiceID(hdfsPathStr); - dataSetReferenceable.set("path", updatedPath); - dataSetReferenceable.set("nameServiceId", nameServiceID); - dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, updatedPath)); + + ret.setAttribute("path", updatedPath); + ret.setAttribute("nameServiceId", nameServiceID); + ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, updatedPath)); } else { - dataSetReferenceable.set("path", hdfsPathStr); - dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, hdfsPathStr)); + ret.setAttribute("path", hdfsPathStr); + ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, hdfsPathStr)); } - dataSetReferenceable.set(AtlasClient.OWNER, stormConf.get("hdfs.kerberos.principal")); - final Path hdfsPath = new Path(hdfsPathStr); - dataSetReferenceable.set(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(hdfsPath).toString().toLowerCase()); - break; + } + break; - case "HiveBolt": - // todo: verify if hive table has everything needed to retrieve existing table - Referenceable dbReferenceable = new Referenceable("hive_db"); - String databaseName = config.get("HiveBolt.options.databaseName"); - dbReferenceable.set(AtlasClient.NAME, databaseName); - dbReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), databaseName)); - dbReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); - dependentEntities.add(dbReferenceable); + case "HiveBolt": { clusterName = extractComponentClusterName(new HiveConf(), stormConf); - final String hiveTableName = config.get("HiveBolt.options.tableName"); - dataSetReferenceable = new Referenceable("hive_table"); - final String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(clusterName, - databaseName, hiveTableName); - dataSetReferenceable.set(AtlasClient.NAME, hiveTableName); - dataSetReferenceable.set(ATTRIBUTE_DB, dbReferenceable); - dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName); - break; + + final String dbName = config.get("HiveBolt.options.databaseName"); + final String tblName = config.get("HiveBolt.options.tableName"); + final String tblQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tblName); + + AtlasEntity dbEntity = new AtlasEntity("hive_db"); + + dbEntity.setAttribute(AtlasClient.NAME, dbName); + dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), dbName)); + dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); + + entityExtInfo.addReferredEntity(dbEntity); + + // todo: verify if hive table has everything needed to retrieve existing table + ret = new AtlasEntity("hive_table"); + + ret.setAttribute(AtlasClient.NAME, tblName); + ret.setAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasObjectId(dbEntity)); + ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tblQualifiedName); + } + break; default: // custom node - create a base dataset class with name attribute //TODO - What should we do for custom data sets. Not sure what name we can set here? return null; } - dependentEntities.add(dataSetReferenceable); - - - return dataSetReferenceable; - } - private String extractComponentClusterName(Configuration configuration, Map stormConf) { - String clusterName = configuration.get(AtlasConstants.CLUSTER_NAME_KEY, null); - if (clusterName == null) { - clusterName = getClusterName(stormConf); + if (ret != null) { + entityExtInfo.addReferredEntity(ret); } - return clusterName; - } + return ret; + } - private ArrayList<Referenceable> createTopologyGraph(StormTopology stormTopology, - Map<String, SpoutSpec> spouts, - Map<String, Bolt> bolts) { + private List<AtlasEntity> createTopologyGraph(StormTopology stormTopology, Map<String, SpoutSpec> spouts, Map<String, Bolt> bolts) { // Add graph of nodes in the topology - final Map<String, Referenceable> nodeEntities = new HashMap<>(); + Map<String, AtlasEntity> nodeEntities = new HashMap<>(); + addSpouts(spouts, nodeEntities); addBolts(bolts, nodeEntities); addGraphConnections(stormTopology, nodeEntities); - ArrayList<Referenceable> nodes = new ArrayList<>(); - nodes.addAll(nodeEntities.values()); - return nodes; + return new ArrayList<>(nodeEntities.values()); } - private void addSpouts(Map<String, SpoutSpec> spouts, - Map<String, Referenceable> nodeEntities) { + private void addSpouts(Map<String, SpoutSpec> spouts, Map<String, AtlasEntity> nodeEntities) { for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) { - final String spoutName = entry.getKey(); - Referenceable spoutReferenceable = createSpoutInstance( - spoutName, entry.getValue()); - nodeEntities.put(spoutName, spoutReferenceable); - } - } + String spoutName = entry.getKey(); + AtlasEntity spout = createSpoutInstance(spoutName, entry.getValue()); - private Referenceable createSpoutInstance(String spoutName, - SpoutSpec stormSpout) { - Referenceable spoutReferenceable = new Referenceable(StormDataTypes.STORM_SPOUT.getName()); - spoutReferenceable.set(AtlasClient.NAME, spoutName); - - Serializable instance = Utils.javaDeserialize( - stormSpout.get_spout_object().get_serialized_java(), Serializable.class); - spoutReferenceable.set("driverClass", instance.getClass().getName()); - - Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null); - spoutReferenceable.set("conf", flatConfigMap); - - return spoutReferenceable; + nodeEntities.put(spoutName, spout); + } } - private void addBolts(Map<String, Bolt> bolts, - Map<String, Referenceable> nodeEntities) { + private void addBolts(Map<String, Bolt> bolts, Map<String, AtlasEntity> nodeEntities) { for (Map.Entry<String, Bolt> entry : bolts.entrySet()) { - Referenceable boltInstance = createBoltInstance(entry.getKey(), entry.getValue()); - nodeEntities.put(entry.getKey(), boltInstance); + String boltName = entry.getKey(); + AtlasEntity boltInstance = createBoltInstance(boltName, entry.getValue()); + + nodeEntities.put(boltName, boltInstance); } } - private Referenceable createBoltInstance(String boltName, - Bolt stormBolt) { - Referenceable boltReferenceable = new Referenceable(StormDataTypes.STORM_BOLT.getName()); + private AtlasEntity createSpoutInstance(String spoutName, SpoutSpec stormSpout) { + AtlasEntity spout = new AtlasEntity(StormDataTypes.STORM_SPOUT.getName()); + Serializable instance = Utils.javaDeserialize(stormSpout.get_spout_object().get_serialized_java(), Serializable.class); + Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null); - boltReferenceable.set(AtlasClient.NAME, boltName); + spout.setAttribute(AtlasClient.NAME, spoutName); + spout.setAttribute("driverClass", instance.getClass().getName()); + spout.setAttribute("conf", flatConfigMap); - Serializable instance = Utils.javaDeserialize( - stormBolt.get_bolt_object().get_serialized_java(), Serializable.class); - boltReferenceable.set("driverClass", instance.getClass().getName()); + return spout; + } + private AtlasEntity createBoltInstance(String boltName, Bolt stormBolt) { + AtlasEntity bolt = new AtlasEntity(StormDataTypes.STORM_BOLT.getName()); + Serializable instance = Utils.javaDeserialize(stormBolt.get_bolt_object().get_serialized_java(), Serializable.class); Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null); - boltReferenceable.set("conf", flatConfigMap); - return boltReferenceable; + bolt.setAttribute(AtlasClient.NAME, boltName); + bolt.setAttribute("driverClass", instance.getClass().getName()); + bolt.setAttribute("conf", flatConfigMap); + + return bolt; } - private void addGraphConnections(StormTopology stormTopology, - Map<String, Referenceable> nodeEntities) { + private void addGraphConnections(StormTopology stormTopology, Map<String, AtlasEntity> nodeEntities) { // adds connections between spouts and bolts - Map<String, Set<String>> adjacencyMap = - StormTopologyUtil.getAdjacencyMap(stormTopology, true); + Map<String, Set<String>> adjacencyMap = StormTopologyUtil.getAdjacencyMap(stormTopology, true); for (Map.Entry<String, Set<String>> entry : adjacencyMap.entrySet()) { - String nodeName = entry.getKey(); + String nodeName = entry.getKey(); Set<String> adjacencyList = adjacencyMap.get(nodeName); - if (adjacencyList == null || adjacencyList.isEmpty()) { + + if (CollectionUtils.isEmpty(adjacencyList)) { continue; } // add outgoing links - Referenceable node = nodeEntities.get(nodeName); - ArrayList<String> outputs = new ArrayList<>(adjacencyList.size()); + AtlasEntity node = nodeEntities.get(nodeName); + List<String> outputs = new ArrayList<>(adjacencyList.size()); + outputs.addAll(adjacencyList); - node.set("outputs", outputs); + node.setAttribute("outputs", outputs); // add incoming links for (String adjacentNodeName : adjacencyList) { - Referenceable adjacentNode = nodeEntities.get(adjacentNodeName); + AtlasEntity adjacentNode = nodeEntities.get(adjacentNodeName); @SuppressWarnings("unchecked") - ArrayList<String> inputs = (ArrayList<String>) adjacentNode.get("inputs"); + List<String> inputs = (List<String>) adjacentNode.getAttribute("inputs"); + if (inputs == null) { inputs = new ArrayList<>(); } + inputs.add(nodeName); - adjacentNode.set("inputs", inputs); + adjacentNode.setAttribute("inputs", inputs); } } } public static String getKafkaTopicQualifiedName(String clusterName, String topicName) { - return String.format("%s@%s", topicName, clusterName); + return String.format("%s@%s", topicName.toLowerCase(), clusterName); } public static String getHbaseTableQualifiedName(String clusterName, String nameSpace, String tableName) { - return String.format("%s.%s@%s", nameSpace, tableName, clusterName); + return String.format("%s.%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), clusterName); } public static String getHdfsPathQualifiedName(String clusterName, String hdfsPath) { - return String.format("%s@%s", hdfsPath, clusterName); + return String.format("%s@%s", hdfsPath.toLowerCase(), clusterName); } private String getClusterName(Map stormConf) { return atlasProperties.getString(AtlasConstants.CLUSTER_NAME_KEY, AtlasConstants.DEFAULT_CLUSTER_NAME); } + + private String extractComponentClusterName(Configuration configuration, Map stormConf) { + String clusterName = configuration.get(AtlasConstants.CLUSTER_NAME_KEY, null); + + if (clusterName == null) { + clusterName = getClusterName(stormConf); + } + + return clusterName; + } + } http://git-wip-us.apache.org/repos/asf/atlas/blob/54c31d5c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1934abf..a6d1268 100644 --- a/pom.xml +++ b/pom.xml @@ -570,6 +570,8 @@ <aopalliance.version>1.0</aopalliance.version> <jackson.version>2.9.2</jackson.version> <commons-conf.version>1.10</commons-conf.version> + <commons-conf2.version>2.2</commons-conf2.version> + <commons-collections.version>3.2.2</commons-collections.version> <commons-logging.version>1.1.3</commons-logging.version> <commons-lang.version>2.6</commons-lang.version> @@ -582,6 +584,10 @@ <maven-site-plugin.version>3.7</maven-site-plugin.version> <doxia.version>1.8</doxia.version> <dropwizard-metrics>3.2.2</dropwizard-metrics> + <!-- hadoop.hdfs-client.version should same as hadoop version --> + <hadoop.hdfs-client.version>2.8.1</hadoop.hdfs-client.version> + <codehaus.woodstox.stax2-api.version>3.1.4</codehaus.woodstox.stax2-api.version> + <woodstox-core.version>5.0.3</woodstox-core.version> <PermGen>64m</PermGen> <MaxPermGen>512m</MaxPermGen>
