Repository: incubator-atlas Updated Branches: refs/heads/master a46711c54 -> b77d7c7bc
ATLAS-183 Add a Hook in Storm to post the topology metadata (svenkat,yhemanth via shwethags) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/b77d7c7b Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/b77d7c7b Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/b77d7c7b Branch: refs/heads/master Commit: b77d7c7bc7cb11a047808c95b051eb2fbd2c813c Parents: a46711c Author: Shwetha GS <[email protected]> Authored: Mon Jan 18 17:02:32 2016 +0530 Committer: Shwetha GS <[email protected]> Committed: Mon Jan 18 17:02:32 2016 +0530 ---------------------------------------------------------------------- .../org/apache/atlas/hive/hook/HiveHook.java | 48 +-- addons/storm-bridge/pom.xml | 30 +- .../apache/atlas/storm/hook/StormAtlasHook.java | 427 +++++++++++++++++++ .../atlas/storm/hook/StormTopologyUtil.java | 237 ++++++++++ .../atlas/storm/hook/StormAtlasHookIT.java | 111 +++++ .../atlas/storm/hook/StormAtlasHookTest.java | 68 +++ .../apache/atlas/storm/hook/StormTestUtil.java | 71 +++ .../java/org/apache/atlas/AtlasConstants.java | 26 ++ .../src/main/assemblies/standalone-package.xml | 6 + .../java/org/apache/atlas/hook/AtlasHook.java | 128 ++++++ .../atlas/kafka/KafkaNotificationProvider.java | 3 +- release-log.txt | 1 + 12 files changed, 1114 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 37a3169..2cc37c0 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -20,15 +20,11 @@ package org.apache.atlas.hive.hook; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.inject.Guice; -import com.google.inject.Inject; -import com.google.inject.Injector; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; -import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.notification.NotificationModule; +import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; import org.apache.commons.configuration.Configuration; @@ -63,7 +59,7 @@ import java.util.concurrent.TimeUnit; /** * AtlasHook sends lineage information to the AtlasSever. */ -public class HiveHook implements ExecuteWithHookContext { +public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private static final Logger LOG = LoggerFactory.getLogger(HiveHook.class); @@ -103,16 +99,13 @@ public class HiveHook implements ExecuteWithHookContext { public Long queryStartTime; } - @Inject - private static NotificationInterface notifInterface; - private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>(); private static final HiveConf hiveConf; static { try { - atlasProperties = ApplicationProperties.get(ApplicationProperties.CLIENT_PROPERTIES); + atlasProperties = ApplicationProperties.get(); // initialize the async facility to process hook calls. We don't // want to do this inline since it adds plenty of overhead for the query. @@ -142,15 +135,17 @@ public class HiveHook implements ExecuteWithHookContext { LOG.info("Attempting to send msg while shutdown in progress.", e); } - Injector injector = Guice.createInjector(new NotificationModule()); - notifInterface = injector.getInstance(NotificationInterface.class); - hiveConf = new HiveConf(); LOG.info("Created Atlas Hook"); } @Override + protected String getNumberOfRetriesPropertyKey() { + return HOOK_NUM_RETRIES; + } + + @Override public void run(final HookContext hookContext) throws Exception { // clone to avoid concurrent access final HiveEvent event = new HiveEvent(); @@ -233,7 +228,7 @@ public class HiveHook implements ExecuteWithHookContext { default: } - notifyAtlas(); + notifyEntities(messages); } private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { @@ -324,31 +319,6 @@ public class HiveHook implements ExecuteWithHookContext { } } - /** - * Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities. - * De-duping of entities is done on server side depending on the unique attribute on the - */ - private void notifyAtlas() { - int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3); - - LOG.debug("Notifying atlas with messages {}", messages); - int numRetries = 0; - while (true) { - try { - notifInterface.send(NotificationInterface.NotificationType.HOOK, messages); - break; - } catch(Exception e) { - numRetries++; - if(numRetries < maxRetries) { - LOG.debug("Failed to notify atlas. Retrying", e); - } else { - LOG.error("Failed to notify atlas after {} retries. Quitting", maxRetries, e); - break; - } - } - } - } - private String normalize(String str) { if (StringUtils.isEmpty(str)) { return null; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/addons/storm-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml index 5ec291c2..7b0d366 100644 --- a/addons/storm-bridge/pom.xml +++ b/addons/storm-bridge/pom.xml @@ -31,6 +31,7 @@ <properties> <storm.version>0.10.0.2.3.99.0-195</storm.version> + <hive.version>1.2.1</hive.version> </properties> <dependencies> @@ -66,6 +67,24 @@ <artifactId>hive-bridge</artifactId> </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <version>${hbase.version}</version> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </dependency> + <!-- apache storm core dependencies --> <dependency> <groupId>org.apache.storm</groupId> @@ -269,7 +288,16 @@ <artifactId>paranamer</artifactId> <version>${paranamer.version}</version> </artifactItem> - + <artifactItem> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${hive.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <version>${hbase.version}</version> + </artifactItem> </artifactItems> </configuration> </execution> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java new file mode 100644 index 0000000..490d95e --- /dev/null +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -0,0 +1,427 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.storm.hook; + +import backtype.storm.ISubmitterHook; +import backtype.storm.generated.Bolt; +import backtype.storm.generated.SpoutSpec; +import backtype.storm.generated.StormTopology; +import backtype.storm.generated.TopologyInfo; +import backtype.storm.utils.Utils; +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasConstants; +import org.apache.atlas.AtlasException; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; +import org.apache.atlas.hive.model.HiveDataModelGenerator; +import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.hook.AtlasHook; +import org.apache.atlas.storm.model.StormDataModel; +import org.apache.atlas.storm.model.StormDataTypes; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.json.TypesSerialization; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.slf4j.Logger; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * StormAtlasHook sends storm topology metadata information to Atlas + * via a Kafka Broker for durability. + * <p/> + * This is based on the assumption that the same topology name is used + * for the various lifecycle stages. + */ +public class StormAtlasHook extends AtlasHook implements ISubmitterHook { + + public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(StormAtlasHook.class); + + private static final String CONF_PREFIX = "atlas.hook.storm."; + private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; + // will be used for owner if Storm topology does not contain the owner instance + // possible if Storm is running in unsecure mode. + public static final String ANONYMOUS_OWNER = "anonymous"; + + public static final String HBASE_NAMESPACE_DEFAULT = "default"; + + private static volatile boolean typesRegistered = false; + + public StormAtlasHook() { + super(); + } + + StormAtlasHook(AtlasClient atlasClient) { + super(atlasClient); + } + @Override + protected String getNumberOfRetriesPropertyKey() { + return HOOK_NUM_RETRIES; + } + + /** + * This is the client-side hook that storm fires when a topology is added. + * + * @param topologyInfo topology info + * @param stormConf configuration + * @param stormTopology a storm topology + * @throws IllegalAccessException + */ + @Override + public void notify(TopologyInfo topologyInfo, Map stormConf, + StormTopology stormTopology) throws IllegalAccessException { + + LOG.info("Collecting metadata for a new storm topology: {}", topologyInfo.get_name()); + try { + if( ! typesRegistered ) { + registerDataModel(new HiveDataModelGenerator()); + } + + ArrayList<Referenceable> entities = new ArrayList<>(); + Referenceable topologyReferenceable = createTopologyInstance(topologyInfo, stormConf); + List<Referenceable> dependentEntities = addTopologyDataSets(stormTopology, topologyReferenceable, + topologyInfo.get_owner(), stormConf); + if (dependentEntities.size()>0) { + entities.addAll(dependentEntities); + } + // create the graph for the topology + ArrayList<Referenceable> graphNodes = createTopologyGraph( + stormTopology, stormTopology.get_spouts(), stormTopology.get_bolts()); + // add the connection from topology to the graph + topologyReferenceable.set("nodes", graphNodes); + entities.add(topologyReferenceable); + + LOG.debug("notifying entities, size = {}", entities.size()); + notifyEntities(entities); + } catch (Exception e) { + throw new RuntimeException("Atlas hook is unable to process the topology.", e); + } + } + + private Referenceable createTopologyInstance(TopologyInfo topologyInfo, Map stormConf) throws Exception { + Referenceable topologyReferenceable = new Referenceable( + StormDataTypes.STORM_TOPOLOGY.getName()); + topologyReferenceable.set("id", topologyInfo.get_id()); + topologyReferenceable.set("name", topologyInfo.get_name()); + String owner = topologyInfo.get_owner(); + if (StringUtils.isEmpty(owner)) { + owner = ANONYMOUS_OWNER; + } + topologyReferenceable.set("owner", owner); + topologyReferenceable.set("startTime", System.currentTimeMillis()); + topologyReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); + + return topologyReferenceable; + } + + private List<Referenceable> addTopologyDataSets(StormTopology stormTopology, + Referenceable topologyReferenceable, + String topologyOwner, + Map stormConf) throws Exception { + List<Referenceable> dependentEntities = new ArrayList<>(); + // add each spout as an input data set + addTopologyInputs(topologyReferenceable, + stormTopology.get_spouts(), stormConf, topologyOwner, dependentEntities); + // add the appropriate bolts as output data sets + addTopologyOutputs(topologyReferenceable, stormTopology, topologyOwner, stormConf, dependentEntities); + return dependentEntities; + } + + private void addTopologyInputs(Referenceable topologyReferenceable, + Map<String, SpoutSpec> spouts, + Map stormConf, + String topologyOwner, List<Referenceable> dependentEntities) throws IllegalAccessException { + final ArrayList<Referenceable> inputDataSets = new ArrayList<>(); + for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) { + Serializable instance = Utils.javaDeserialize( + entry.getValue().get_spout_object().get_serialized_java(), Serializable.class); + + String simpleName = instance.getClass().getSimpleName(); + final Referenceable datasetRef = createDataSet(simpleName, topologyOwner, instance, stormConf, dependentEntities); + if (datasetRef != null) { + inputDataSets.add(datasetRef); + } + } + + topologyReferenceable.set("inputs", inputDataSets); + } + + private void addTopologyOutputs(Referenceable topologyReferenceable, + StormTopology stormTopology, String topologyOwner, + Map stormConf, List<Referenceable> dependentEntities) throws Exception { + final ArrayList<Referenceable> outputDataSets = new ArrayList<>(); + + Map<String, Bolt> bolts = stormTopology.get_bolts(); + Set<String> terminalBoltNames = StormTopologyUtil.getTerminalUserBoltNames(stormTopology); + for (String terminalBoltName : terminalBoltNames) { + Serializable instance = Utils.javaDeserialize(bolts.get(terminalBoltName) + .get_bolt_object().get_serialized_java(), Serializable.class); + + String dataSetType = instance.getClass().getSimpleName(); + final Referenceable datasetRef = createDataSet(dataSetType, topologyOwner, instance, stormConf, dependentEntities); + if (datasetRef != null) { + outputDataSets.add(datasetRef); + } + } + + topologyReferenceable.set("outputs", outputDataSets); + } + + private Referenceable createDataSet(String name, String topologyOwner, + Serializable instance, + Map stormConf, List<Referenceable> dependentEntities) throws IllegalAccessException { + Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true); + + String clusterName = null; + Referenceable dataSetReferenceable; + // todo: need to redo this with a config driven approach + switch (name) { + case "KafkaSpout": + dataSetReferenceable = new Referenceable(StormDataTypes.KAFKA_TOPIC.getName()); + final String topicName = config.get("KafkaSpout._spoutConfig.topic"); + dataSetReferenceable.set("topic", topicName); + dataSetReferenceable.set("uri", + config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr")); + if (StringUtils.isEmpty(topologyOwner)) { + topologyOwner = ANONYMOUS_OWNER; + } + dataSetReferenceable.set("owner", topologyOwner); + dataSetReferenceable.set("name", getKafkaTopicQualifiedName(getClusterName(stormConf), topicName)); + break; + + case "HBaseBolt": + dataSetReferenceable = new Referenceable(StormDataTypes.HBASE_TABLE.getName()); + final String hbaseTableName = config.get("HBaseBolt.tableName"); + dataSetReferenceable.set("uri", stormConf.get("hbase.rootdir")); + dataSetReferenceable.set("tableName", hbaseTableName); + dataSetReferenceable.set("owner", stormConf.get("storm.kerberos.principal")); + clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf); + //TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName + dataSetReferenceable.set("name", getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, + hbaseTableName)); + break; + + case "HdfsBolt": + dataSetReferenceable = new Referenceable(StormDataTypes.HDFS_DATA_SET.getName()); + String hdfsUri = config.get("HdfsBolt.rotationActions") == null + ? config.get("HdfsBolt.fileNameFormat.path") + : config.get("HdfsBolt.rotationActions"); + final String hdfsPath = config.get("HdfsBolt.fsUrl") + hdfsUri; + dataSetReferenceable.set("pathURI", hdfsPath); + dataSetReferenceable.set("owner", stormConf.get("hdfs.kerberos.principal")); + dataSetReferenceable.set("name", hdfsPath); + break; + + case "HiveBolt": + // todo: verify if hive table has everything needed to retrieve existing table + Referenceable dbReferenceable = new Referenceable("hive_db"); + String databaseName = config.get("HiveBolt.options.databaseName"); + dbReferenceable.set(HiveDataModelGenerator.NAME, databaseName); + dbReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), databaseName)); + dbReferenceable.set(HiveDataModelGenerator.CLUSTER_NAME, getClusterName(stormConf)); + dependentEntities.add(dbReferenceable); + clusterName = extractComponentClusterName(new HiveConf(), stormConf); + final String hiveTableName = config.get("HiveBolt.options.tableName"); + dataSetReferenceable = new Referenceable("hive_table"); + final String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(clusterName, + databaseName, hiveTableName); + dataSetReferenceable.set(HiveDataModelGenerator.NAME, tableQualifiedName); + dataSetReferenceable.set(HiveDataModelGenerator.DB, dbReferenceable); + dataSetReferenceable.set(HiveDataModelGenerator.TABLE_NAME, hiveTableName); + break; + + default: + // custom node - create a base dataset class with name attribute + //TODO - What should we do for custom data sets. Not sure what name we can set here? + return null; + } + dependentEntities.add(dataSetReferenceable); + + + return dataSetReferenceable; + } + + private String extractComponentClusterName(Configuration configuration, Map stormConf) { + String clusterName = configuration.get(AtlasConstants.CLUSTER_NAME_KEY, null); + if (clusterName == null) { + clusterName = getClusterName(stormConf); + } + return clusterName; + } + + + private ArrayList<Referenceable> createTopologyGraph(StormTopology stormTopology, + Map<String, SpoutSpec> spouts, + Map<String, Bolt> bolts) throws Exception { + // Add graph of nodes in the topology + final Map<String, Referenceable> nodeEntities = new HashMap<>(); + addSpouts(spouts, nodeEntities); + addBolts(bolts, nodeEntities); + + addGraphConnections(stormTopology, nodeEntities); + + ArrayList<Referenceable> nodes = new ArrayList<>(); + nodes.addAll(nodeEntities.values()); + return nodes; + } + + private void addSpouts(Map<String, SpoutSpec> spouts, + Map<String, Referenceable> nodeEntities) throws IllegalAccessException { + for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) { + final String spoutName = entry.getKey(); + Referenceable spoutReferenceable = createSpoutInstance( + spoutName, entry.getValue()); + nodeEntities.put(spoutName, spoutReferenceable); + } + } + + private Referenceable createSpoutInstance(String spoutName, + SpoutSpec stormSpout) throws IllegalAccessException { + Referenceable spoutReferenceable = new Referenceable( + StormDataTypes.STORM_SPOUT.getName(), "DataProducer"); + spoutReferenceable.set("name", spoutName); + + Serializable instance = Utils.javaDeserialize( + stormSpout.get_spout_object().get_serialized_java(), Serializable.class); + spoutReferenceable.set("driverClass", instance.getClass().getName()); + + Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true); + spoutReferenceable.set("conf", flatConfigMap); + + return spoutReferenceable; + } + + private void addBolts(Map<String, Bolt> bolts, + Map<String, Referenceable> nodeEntities) throws IllegalAccessException { + for (Map.Entry<String, Bolt> entry : bolts.entrySet()) { + Referenceable boltInstance = createBoltInstance(entry.getKey(), entry.getValue()); + nodeEntities.put(entry.getKey(), boltInstance); + } + } + + private Referenceable createBoltInstance(String boltName, + Bolt stormBolt) throws IllegalAccessException { + Referenceable boltReferenceable = new Referenceable( + StormDataTypes.STORM_BOLT.getName(), "DataProcessor"); + + boltReferenceable.set("name", boltName); + + Serializable instance = Utils.javaDeserialize( + stormBolt.get_bolt_object().get_serialized_java(), Serializable.class); + boltReferenceable.set("driverClass", instance.getClass().getName()); + + Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true); + boltReferenceable.set("conf", flatConfigMap); + + return boltReferenceable; + } + + private void addGraphConnections(StormTopology stormTopology, + Map<String, Referenceable> nodeEntities) throws Exception { + // adds connections between spouts and bolts + Map<String, Set<String>> adjacencyMap = + StormTopologyUtil.getAdjacencyMap(stormTopology, true); + + for (Map.Entry<String, Set<String>> entry : adjacencyMap.entrySet()) { + String nodeName = entry.getKey(); + Set<String> adjacencyList = adjacencyMap.get(nodeName); + if (adjacencyList == null || adjacencyList.isEmpty()) { + continue; + } + + // add outgoing links + Referenceable node = nodeEntities.get(nodeName); + ArrayList<String> outputs = new ArrayList<>(adjacencyList.size()); + outputs.addAll(adjacencyList); + node.set("outputs", outputs); + + // add incoming links + for (String adjacentNodeName : adjacencyList) { + Referenceable adjacentNode = nodeEntities.get(adjacentNodeName); + @SuppressWarnings("unchecked") + ArrayList<String> inputs = (ArrayList<String>) adjacentNode.get("inputs"); + if (inputs == null) { + inputs = new ArrayList<>(); + } + inputs.add(nodeName); + adjacentNode.set("inputs", inputs); + } + } + } + + public static String getKafkaTopicQualifiedName(String clusterName, String topicName) { + return String.format("%s@%s", topicName, clusterName); + } + + public static String getHbaseTableQualifiedName(String clusterName, String nameSpace, String tableName) { + return String.format("%s.%s@%s", nameSpace, tableName, clusterName); + } + + public synchronized void registerDataModel(HiveDataModelGenerator dataModelGenerator) throws AtlasException, + AtlasServiceException { + + try { + atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName()); + LOG.info("Hive data model is already registered! Going ahead with registration of Storm Data model"); + } catch(AtlasServiceException ase) { + if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) { + //Expected in case types do not exist + LOG.info("Registering Hive data model"); + atlasClient.createType(dataModelGenerator.getModelAsJson()); + } else { + throw ase; + } + } + + + try { + atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName()); + } catch(AtlasServiceException ase) { + if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) { + LOG.info("Registering Storm/Kafka data model"); + StormDataModel.main(new String[]{}); + TypesDef typesDef = StormDataModel.typesDef(); + String stormTypesAsJSON = TypesSerialization.toJson(typesDef); + LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON); + atlasClient.createType(stormTypesAsJSON); + } + } + typesRegistered = true; + } + + private String getClusterName(Map stormConf) { + String clusterName = AtlasConstants.DEFAULT_CLUSTER_NAME; + if (stormConf.containsKey(AtlasConstants.CLUSTER_NAME_KEY)) { + clusterName = (String)stormConf.get(AtlasConstants.CLUSTER_NAME_KEY); + } + return clusterName; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java new file mode 100644 index 0000000..b352a49 --- /dev/null +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.storm.hook; + +import backtype.storm.generated.Bolt; +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.generated.Grouping; +import backtype.storm.generated.StormTopology; +import com.google.common.base.Joiner; +import org.slf4j.Logger; + +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * A storm topology utility class. + */ +public final class StormTopologyUtil { + + private StormTopologyUtil() { + } + + public static Set<String> getTerminalUserBoltNames(StormTopology topology) throws Exception { + Set<String> terminalBolts = new HashSet<>(); + Set<String> inputs = new HashSet<>(); + for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) { + String name = entry.getKey(); + Set<GlobalStreamId> inputsForBolt = entry.getValue().get_common().get_inputs().keySet(); + if (!isSystemComponent(name)) { + for (GlobalStreamId streamId : inputsForBolt) { + inputs.add(streamId.get_componentId()); + } + } + } + + for (String boltName : topology.get_bolts().keySet()) { + if (!isSystemComponent(boltName) && !inputs.contains(boltName)) { + terminalBolts.add(boltName); + } + } + + return terminalBolts; + } + + public static boolean isSystemComponent(String componentName) { + return componentName.startsWith("__"); + } + + public static Map<String, Set<String>> getAdjacencyMap(StormTopology topology, + boolean removeSystemComponent) + throws Exception { + Map<String, Set<String>> adjacencyMap = new HashMap<>(); + + for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) { + String boltName = entry.getKey(); + Map<GlobalStreamId, Grouping> inputs = entry.getValue().get_common().get_inputs(); + for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) { + String inputComponentId = input.getKey().get_componentId(); + Set<String> components = adjacencyMap.containsKey(inputComponentId) + ? adjacencyMap.get(inputComponentId) : new HashSet<String>(); + components.add(boltName); + components = removeSystemComponent ? removeSystemComponents(components) + : components; + if ((removeSystemComponent && !isSystemComponent(inputComponentId)) || + !removeSystemComponent) { + adjacencyMap.put(inputComponentId, components); + } + } + } + + return adjacencyMap; + } + + public static Set<String> removeSystemComponents(Set<String> components) { + Set<String> userComponents = new HashSet<>(); + for (String component : components) { + if (!isSystemComponent(component)) + userComponents.add(component); + } + + return userComponents; + } + + private static final Set<Class> WRAPPER_TYPES = new HashSet<Class>() {{ + add(Boolean.class); + add(Character.class); + add(Byte.class); + add(Short.class); + add(Integer.class); + add(Long.class); + add(Float.class); + add(Double.class); + add(Void.class); + add(String.class); + }}; + + public static boolean isWrapperType(Class clazz) { + return WRAPPER_TYPES.contains(clazz); + } + + public static boolean isCollectionType(Class clazz) { + return Collection.class.isAssignableFrom(clazz); + } + + public static boolean isMapType(Class clazz) { + return Map.class.isAssignableFrom(clazz); + } + + public static Map<String, String> getFieldValues(Object instance, + boolean prependClassName) + throws IllegalAccessException { + Class clazz = instance.getClass(); + Map<String, String> output = new HashMap<>(); + for (Class<?> c = clazz; c != null; c = c.getSuperclass()) { + Field[] fields = c.getDeclaredFields(); + for (Field field : fields) { + if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) { + continue; + } + + String key; + if (prependClassName) { + key = String.format("%s.%s", clazz.getSimpleName(), field.getName()); + } else { + key = field.getName(); + } + + boolean accessible = field.isAccessible(); + if (!accessible) { + field.setAccessible(true); + } + Object fieldVal = field.get(instance); + if (fieldVal == null) { + continue; + } else if (fieldVal.getClass().isPrimitive() || + isWrapperType(fieldVal.getClass())) { + if (toString(fieldVal, false).isEmpty()) continue; + output.put(key, toString(fieldVal, false)); + } else if (isMapType(fieldVal.getClass())) { + //TODO: check if it makes more sense to just stick to json + // like structure instead of a flatten output. + Map map = (Map) fieldVal; + for (Object entry : map.entrySet()) { + Object mapKey = ((Map.Entry) entry).getKey(); + Object mapVal = ((Map.Entry) entry).getValue(); + + String keyStr = getString(mapKey, false); + String valStr = getString(mapVal, false); + if ((valStr == null) || (valStr.isEmpty())) { + continue; + } else { + output.put(String.format("%s.%s", key, keyStr), valStr); + } + } + } else if (isCollectionType(fieldVal.getClass())) { + //TODO check if it makes more sense to just stick to + // json like structure instead of a flatten output. + Collection collection = (Collection) fieldVal; + if (collection.size()==0) continue; + String outStr = ""; + for (Object o : collection) { + outStr += getString(o, false) + ","; + } + if (outStr.length() > 0) { + outStr = outStr.substring(0, outStr.length() - 1); + } + output.put(key, String.format("%s", outStr)); + } else { + Map<String, String> nestedFieldValues = getFieldValues(fieldVal, false); + for (Map.Entry<String, String> entry : nestedFieldValues.entrySet()) { + output.put(String.format("%s.%s", key, entry.getKey()), entry.getValue()); + } + } + if (!accessible) { + field.setAccessible(false); + } + } + } + return output; + } + + private static String getString(Object instance, + boolean wrapWithQuote) throws IllegalAccessException { + if (instance == null) { + return null; + } else if (instance.getClass().isPrimitive() || isWrapperType(instance.getClass())) { + return toString(instance, wrapWithQuote); + } else { + return getString(getFieldValues(instance, false), wrapWithQuote); + } + } + + private static String getString(Map<String, String> flattenFields, boolean wrapWithQuote) { + String outStr = ""; + if (flattenFields != null && !flattenFields.isEmpty()) { + if (wrapWithQuote) { + outStr += "\"" + Joiner.on(",").join(flattenFields.entrySet()) + "\","; + } else { + outStr += Joiner.on(",").join(flattenFields.entrySet()) + ","; + } + } + if (outStr.length() > 0) { + outStr = outStr.substring(0, outStr.length() - 1); + } + return outStr; + } + + private static String toString(Object instance, boolean wrapWithQuote) { + if (instance instanceof String) + if (wrapWithQuote) + return "\"" + instance + "\""; + else + return instance.toString(); + else + return instance.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java new file mode 100644 index 0000000..2463a77 --- /dev/null +++ b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.storm.hook; + +import backtype.storm.ILocalCluster; +import backtype.storm.generated.StormTopology; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.hive.model.HiveDataModelGenerator; +import org.apache.atlas.storm.model.StormDataModel; +import org.apache.atlas.storm.model.StormDataTypes; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.json.TypesSerialization; +import org.apache.commons.configuration.Configuration; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test +public class StormAtlasHookIT { + + public static final Logger LOG = LoggerFactory.getLogger(StormAtlasHookIT.class); + + private static final String ATLAS_URL = "http://localhost:21000/"; + private static final String TOPOLOGY_NAME = "word-count"; + + private ILocalCluster stormCluster; + private AtlasClient atlasClient; + + @BeforeClass + public void setUp() throws Exception { + // start a local storm cluster + stormCluster = StormTestUtil.createLocalStormCluster(); + LOG.info("Created a storm local cluster"); + + Configuration configuration = ApplicationProperties.get(); + atlasClient = new AtlasClient(configuration.getString("atlas.rest.address", ATLAS_URL)); + } + + @AfterClass + public void tearDown() throws Exception { + LOG.info("Shutting down storm local cluster"); + stormCluster.shutdown(); + + atlasClient = null; + } + + @Test + public void testCreateDataModel() throws Exception { + StormDataModel.main(new String[]{}); + TypesDef stormTypesDef = StormDataModel.typesDef(); + + String stormTypesAsJSON = TypesSerialization.toJson(stormTypesDef); + LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON); + + new StormAtlasHook().registerDataModel(new HiveDataModelGenerator()); + + // verify types are registered + for (StormDataTypes stormDataType : StormDataTypes.values()) { + Assert.assertNotNull(atlasClient.getType(stormDataType.getName())); + } + } + + @Test (dependsOnMethods = "testCreateDataModel") + public void testAddEntities() throws Exception { + StormTopology stormTopology = StormTestUtil.createTestTopology(); + StormTestUtil.submitTopology(stormCluster, TOPOLOGY_NAME, stormTopology); + LOG.info("Submitted topology {}", TOPOLOGY_NAME); + + // todo: test if topology metadata is registered in atlas + String guid = getTopologyGUID(); + Assert.assertNotNull(guid); + LOG.info("GUID is {}", guid); + + Referenceable topologyReferenceable = atlasClient.getEntity(guid); + Assert.assertNotNull(topologyReferenceable); + } + + private String getTopologyGUID() throws Exception { + LOG.debug("Searching for topology {}", TOPOLOGY_NAME); + String query = String.format("from %s where name = \"%s\"", + StormDataTypes.STORM_TOPOLOGY.getName(), TOPOLOGY_NAME); + + JSONArray results = atlasClient.search(query); + JSONObject row = results.getJSONObject(0); + + return row.has("$id$") ? row.getJSONObject("$id$").getString("id"): null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java new file mode 100644 index 0000000..51840a5 --- /dev/null +++ b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.storm.hook; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasException; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.hive.model.HiveDataModelGenerator; +import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.storm.model.StormDataTypes; +import org.testng.annotations.Test; + +import static org.mockito.Matchers.contains; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@Test +public class StormAtlasHookTest { + + @Test + public void testStormRegistersHiveDataModelIfNotPresent() throws AtlasException, AtlasServiceException { + AtlasClient atlasClient = mock(AtlasClient.class); + HiveDataModelGenerator dataModelGenerator = mock(HiveDataModelGenerator.class); + AtlasServiceException atlasServiceException = mock(AtlasServiceException.class); + when(atlasServiceException.getStatus()).thenReturn(ClientResponse.Status.NOT_FOUND); + when(atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName())).thenThrow(atlasServiceException); + String hiveModel = "{hive_model_as_json}"; + when(dataModelGenerator.getModelAsJson()).thenReturn(hiveModel); + + StormAtlasHook stormAtlasHook = new StormAtlasHook(atlasClient); + stormAtlasHook.registerDataModel(dataModelGenerator); + + verify(atlasClient).createType(hiveModel); + } + + @Test + public void testStormRegistersStormModelIfNotPresent() throws AtlasServiceException, AtlasException { + AtlasClient atlasClient = mock(AtlasClient.class); + HiveDataModelGenerator dataModelGenerator = mock(HiveDataModelGenerator.class); + when(atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName())).thenReturn("hive_process_definition"); + AtlasServiceException atlasServiceException = mock(AtlasServiceException.class); + when(atlasServiceException.getStatus()).thenReturn(ClientResponse.Status.NOT_FOUND); + when(atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName())).thenThrow(atlasServiceException); + + StormAtlasHook stormAtlasHook = new StormAtlasHook(atlasClient); + stormAtlasHook.registerDataModel(dataModelGenerator); + + verify(atlasClient).createType(contains("storm_topology")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java new file mode 100644 index 0000000..1e13f56 --- /dev/null +++ b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormTestUtil.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.storm.hook; + +import backtype.storm.Config; +import backtype.storm.ILocalCluster; +import backtype.storm.Testing; +import backtype.storm.generated.StormTopology; +import backtype.storm.testing.TestGlobalCount; +import backtype.storm.testing.TestWordCounter; +import backtype.storm.testing.TestWordSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.utils.Utils; + +import java.util.HashMap; + +/** + * An until to create a test topology. + */ +final class StormTestUtil { + + private StormTestUtil() { + } + + public static ILocalCluster createLocalStormCluster() { + // start a local storm cluster + HashMap<String,Object> localClusterConf = new HashMap<>(); + localClusterConf.put("nimbus-daemon", true); + return Testing.getLocalCluster(localClusterConf); + } + + public static StormTopology createTestTopology() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("words", new TestWordSpout(), 10); + builder.setBolt("count", new TestWordCounter(), 3).shuffleGrouping("words"); + builder.setBolt("globalCount", new TestGlobalCount(), 2).shuffleGrouping("count"); + + return builder.createTopology(); + } + + public static Config submitTopology(ILocalCluster stormCluster, String topologyName, + StormTopology stormTopology) throws Exception { + Config stormConf = new Config(); + stormConf.putAll(Utils.readDefaultConfig()); + stormConf.setDebug(true); + stormConf.setMaxTaskParallelism(3); + stormConf.put(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN, + org.apache.atlas.storm.hook.StormAtlasHook.class.getName()); + + stormCluster.submitTopology(topologyName, stormConf, stormTopology); + + Thread.sleep(10000); + return stormConf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/common/src/main/java/org/apache/atlas/AtlasConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/AtlasConstants.java b/common/src/main/java/org/apache/atlas/AtlasConstants.java new file mode 100644 index 0000000..d590d6d --- /dev/null +++ b/common/src/main/java/org/apache/atlas/AtlasConstants.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas; + +public interface AtlasConstants { + String CLUSTER_NAME_KEY = "atlas.cluster.name"; + String DEFAULT_CLUSTER_NAME = "primary"; + String CLUSTER_NAME_ATTRIBUTE = "clusterName"; + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/distro/src/main/assemblies/standalone-package.xml ---------------------------------------------------------------------- diff --git a/distro/src/main/assemblies/standalone-package.xml b/distro/src/main/assemblies/standalone-package.xml index b80a0ad..88d0e60 100755 --- a/distro/src/main/assemblies/standalone-package.xml +++ b/distro/src/main/assemblies/standalone-package.xml @@ -110,6 +110,12 @@ <directory>../addons/sqoop-bridge/target/dependency/hook</directory> <outputDirectory>hook</outputDirectory> </fileSet> + + <!-- addons/storm --> + <fileSet> + <directory>../addons/storm-bridge/target/dependency/hook</directory> + <outputDirectory>hook</outputDirectory> + </fileSet> </fileSets> <files> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java new file mode 100644 index 0000000..4b1c78c --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hook; + +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.notification.NotificationInterface; +import org.apache.atlas.notification.NotificationModule; +import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.json.InstanceSerialization; +import org.apache.commons.configuration.Configuration; +import org.codehaus.jettison.json.JSONArray; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + + +/** + * A base class for atlas hooks. + */ +public abstract class AtlasHook { + + private static final Logger LOG = LoggerFactory.getLogger(AtlasHook.class); + private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; + + public static final String ATLAS_ENDPOINT = "atlas.rest.address"; + + protected final AtlasClient atlasClient; + + /** + * Hadoop Cluster name for this instance, typically used for namespace. + */ + protected static Configuration atlasProperties; + + @Inject + protected static NotificationInterface notifInterface; + + static { + try { + atlasProperties = ApplicationProperties.get(ApplicationProperties.CLIENT_PROPERTIES); + } catch (Exception e) { + LOG.info("Attempting to send msg while shutdown in progress.", e); + } + + Injector injector = Guice.createInjector(new NotificationModule()); + notifInterface = injector.getInstance(NotificationInterface.class); + + LOG.info("Created Atlas Hook"); + } + + public AtlasHook() { + this(new AtlasClient(atlasProperties.getString(ATLAS_ENDPOINT, DEFAULT_ATLAS_URL))); + } + + public AtlasHook(AtlasClient atlasClient) { + this.atlasClient = atlasClient; + //TODO - take care of passing in - ugi, doAsUser for secure cluster + } + + protected abstract String getNumberOfRetriesPropertyKey(); + + protected void notifyEntities(Collection<Referenceable> entities) { + JSONArray entitiesArray = new JSONArray(); + + for (Referenceable entity : entities) { + LOG.info("Adding entity for type: {}", entity.getTypeName()); + final String entityJson = InstanceSerialization.toJson(entity, true); + entitiesArray.put(entityJson); + } + + List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); + hookNotificationMessages.add(new HookNotification.EntityCreateRequest(entitiesArray)); + notifyEntities(hookNotificationMessages); + } + + /** + * Notify atlas + * of the entity through message. The entity can be a + * complex entity with reference to other entities. + * De-duping of entities is done on server side depending on the + * unique attribute on the entities. + * + * @param entities entities + */ + protected void notifyEntities(List<HookNotification.HookNotificationMessage> entities) { + final int maxRetries = atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3); + final String message = entities.toString(); + + int numRetries = 0; + while (true) { + try { + notifInterface.send(NotificationInterface.NotificationType.HOOK, entities); + return; + } catch(Exception e) { + numRetries++; + if(numRetries < maxRetries) { + LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e); + } else { + LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", + message, maxRetries, e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotificationProvider.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotificationProvider.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotificationProvider.java index c97c726..1d73af5 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotificationProvider.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotificationProvider.java @@ -32,8 +32,7 @@ public class KafkaNotificationProvider implements Provider<KafkaNotification> { public KafkaNotification get() { try { Configuration applicationProperties = ApplicationProperties.get(); - KafkaNotification instance = new KafkaNotification(applicationProperties); - return instance; + return new KafkaNotification(applicationProperties); } catch(AtlasException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b77d7c7b/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 9dfd4ff..fe02058 100644 --- a/release-log.txt +++ b/release-log.txt @@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES: ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-183 Add a Hook in Storm to post the topology metadata (svenkat,yhemanth via shwethags) ATLAS-370 Implement deleteEntities at repository level (dkantor via shwethags) ATLAS-406 Resizing lineage window â should be an anchor on a corner â like ppt for graphic (sanjayp via shwethags) ATLAS-432 QuickStart lineage is broken (yhemanth via shwethags)
