ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/086b4a3e Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/086b4a3e Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/086b4a3e Branch: refs/heads/master Commit: 086b4a3ee0480d37f9aa66fb878c9dc978aaa043 Parents: 70d5498 Author: Shwetha GS <[email protected]> Authored: Wed Dec 23 14:40:49 2015 +0530 Committer: Shwetha GS <[email protected]> Committed: Wed Dec 23 14:41:00 2015 +0530 ---------------------------------------------------------------------- addons/falcon-bridge/pom.xml | 274 ++++++++++++++ .../apache/atlas/falcon/hook/FalconHook.java | 356 ++++++++++++++++++ .../falcon/model/FalconDataModelGenerator.java | 153 ++++++++ .../atlas/falcon/model/FalconDataTypes.java | 40 +++ .../org/apache/falcon/atlas/Util/EventUtil.java | 68 ++++ .../apache/falcon/atlas/event/FalconEvent.java | 65 ++++ .../atlas/publisher/FalconEventPublisher.java | 41 +++ .../falcon/atlas/service/AtlasService.java | 115 ++++++ .../apache/atlas/falcon/hook/FalconHookIT.java | 205 +++++++++++ .../src/test/resources/cluster.xml | 45 +++ .../falcon-bridge/src/test/resources/feed.xml | 38 ++ .../src/test/resources/hive-site.xml | 53 +++ .../src/test/resources/process.xml | 53 +++ .../src/test/resources/startup.properties | 20 ++ .../atlas/hive/bridge/HiveMetaStoreBridge.java | 9 +- .../hive/model/HiveDataModelGenerator.java | 2 +- addons/sqoop-bridge/pom.xml | 357 +++++++++++++++++++ .../org/apache/atlas/sqoop/hook/SqoopHook.java | 230 ++++++++++++ .../sqoop/model/SqoopDataModelGenerator.java | 180 ++++++++++ .../atlas/sqoop/model/SqoopDataTypes.java | 34 ++ .../apache/atlas/sqoop/hook/SqoopHookIT.java | 124 +++++++ .../src/test/resources/hive-site.xml | 53 +++ .../src/test/resources/sqoop-site.xml | 190 ++++++++++ distro/src/bin/atlas_start.py | 2 +- .../src/main/assemblies/standalone-package.xml | 12 + distro/src/test/python/scripts/TestMetadata.py | 4 +- docs/src/site/twiki/Architecture.twiki | 3 +- docs/src/site/twiki/Bridge-Falcon.twiki | 34 ++ docs/src/site/twiki/Bridge-Sqoop.twiki | 37 ++ docs/src/site/twiki/index.twiki | 2 + .../notification/hook/HookNotification.java | 7 +- pom.xml | 20 ++ release-log.txt | 1 + .../main/resources/atlas-application.properties | 2 +- 34 files changed, 2818 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml new file mode 100644 index 0000000..73ef265 --- /dev/null +++ b/addons/falcon-bridge/pom.xml @@ -0,0 +1,274 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>apache-atlas</artifactId> + <groupId>org.apache.atlas</groupId> + <version>0.7-incubating-SNAPSHOT</version> + <relativePath>../../</relativePath> + </parent> + <artifactId>falcon-bridge</artifactId> + <description>Apache Atlas Falcon Bridge Module</description> + <name>Apache Atlas Falcon Bridge</name> + <packaging>jar</packaging> + + <properties> + <falcon.version>0.8</falcon.version> + </properties> + + <dependencies> + <!-- Logging --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-typesystem</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-notification</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-common</artifactId> + <version>${falcon.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>hive-bridge</artifactId> + </dependency> + + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-hook-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/dependency/hook/falcon</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <artifactItems> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>${project.artifactId}</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>hive-bridge</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.json4s</groupId> + <artifactId>json4s-native_2.10</artifactId> + <version>${json.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.json4s</groupId> + <artifactId>json4s-core_2.10</artifactId> + <version>${json.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.json4s</groupId> + <artifactId>json4s-ast_2.10</artifactId> + <version>${json.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-client</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-typesystem</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-notification</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-common</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + <version>${scala.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${scala.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.scala-lang</groupId> + <artifactId>scalap</artifactId> + <version>${scala.version}</version> + </artifactItem> + <artifactItem> + <groupId>com.google.inject.extensions</groupId> + <artifactId>guice-multibindings</artifactId> + <version>${guice.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + <version>${kafka.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-maven-plugin</artifactId> + <configuration> + <!--<skip>${skipTests}</skip>--> + <!--only skip int tests --> + <httpConnector> + <port>31000</port> + <idleTimeout>60000</idleTimeout> + </httpConnector> + <war>../../webapp/target/atlas-webapp-${project.version}.war</war> + <daemon>true</daemon> + <webApp> + <contextPath>/</contextPath> + <descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor> + </webApp> + <useTestScope>true</useTestScope> + <systemProperties> + <systemProperty> + <name>log4j.configuration</name> + <value>atlas-log4j.xml</value> + </systemProperty> + <systemProperty> + <name>atlas.log.dir</name> + <value>${project.build.directory}/logs</value> + </systemProperty> + <systemProperty> + <name>atlas.data</name> + <value>${project.build.directory}/data</value> + </systemProperty> + </systemProperties> + <stopKey>atlas-stop</stopKey> + <stopPort>31001</stopPort> + </configuration> + <executions> + <execution> + <id>start-jetty</id> + <phase>pre-integration-test</phase> + <goals> + <goal>deploy-war</goal> + </goals> + <configuration> + <daemon>true</daemon> + </configuration> + </execution> + <execution> + <id>stop-jetty</id> + <phase>post-integration-test</phase> + <goals> + <goal>stop</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-site-plugin</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.maven.doxia</groupId> + <artifactId>doxia-module-twiki</artifactId> + <version>1.3</version> + </dependency> + </dependencies> + <executions> + <execution> + <goals> + <goal>site</goal> + </goals> + <phase>prepare-package</phase> + </execution> + </executions> + <configuration> + <generateProjectInfo>false</generateProjectInfo> + <generateReports>false</generateReports> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java new file mode 100644 index 0000000..05765bb --- /dev/null +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.falcon.hook; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.falcon.model.FalconDataModelGenerator; +import org.apache.atlas.falcon.model.FalconDataTypes; +import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; +import org.apache.atlas.hive.model.HiveDataModelGenerator; +import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.notification.NotificationInterface; +import org.apache.atlas.notification.NotificationModule; +import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.atlas.Util.EventUtil; +import org.apache.falcon.atlas.event.FalconEvent; +import org.apache.falcon.atlas.publisher.FalconEventPublisher; +import org.apache.falcon.entity.CatalogStorage; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.feed.CatalogTable; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.process.Cluster; +import org.apache.falcon.entity.v0.process.Input; +import org.apache.falcon.entity.v0.process.Output; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Falcon hook sends lineage information to the Atlas Service. + */ +public class FalconHook extends FalconEventPublisher { + private static final Logger LOG = LoggerFactory.getLogger(FalconHook.class); + + public static final String CONF_PREFIX = "atlas.hook.falcon."; + private static final String MIN_THREADS = CONF_PREFIX + "minThreads"; + private static final String MAX_THREADS = CONF_PREFIX + "maxThreads"; + private static final String KEEP_ALIVE_TIME = CONF_PREFIX + "keepAliveTime"; + public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize"; + public static final String CONF_SYNC = CONF_PREFIX + "synchronous"; + + public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; + + public static final String ATLAS_ENDPOINT = "atlas.rest.address"; + + private static AtlasClient atlasClient; + + // wait time determines how long we wait before we exit the jvm on + // shutdown. Pending requests after that will not be sent. + private static final int WAIT_TIME = 3; + private static ExecutorService executor; + + private static final int minThreadsDefault = 5; + private static final int maxThreadsDefault = 5; + private static final long keepAliveTimeDefault = 10; + private static final int queueSizeDefault = 10000; + + private static Configuration atlasProperties; + @Inject + private static NotificationInterface notifInterface; + + public static boolean typesRegistered = false; + + private static boolean sync; + + private static ConfigurationStore STORE; + + static { + try { + atlasProperties = ApplicationProperties.get(); + + // initialize the async facility to process hook calls. We don't + // want to do this inline since it adds plenty of overhead for the query. + int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault); + int maxThreads = atlasProperties.getInt(MAX_THREADS, maxThreadsDefault); + long keepAliveTime = atlasProperties.getLong(KEEP_ALIVE_TIME, keepAliveTimeDefault); + int queueSize = atlasProperties.getInt(QUEUE_SIZE, queueSizeDefault); + sync = atlasProperties.getBoolean(CONF_SYNC, false); + + executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(queueSize), + new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build()); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + executor.shutdown(); + executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); + executor = null; + } catch (InterruptedException ie) { + LOG.info("Interrupt received in shutdown."); + } + // shutdown client + } + }); + atlasClient = new AtlasClient(atlasProperties.getString(ATLAS_ENDPOINT), + EventUtil.getUgi(), EventUtil.getUgi().getShortUserName()); + + STORE = ConfigurationStore.get(); + } catch (Exception e) { + LOG.info("Caught exception initializing the falcon hook.", e); + } + + Injector injector = Guice.createInjector(new NotificationModule()); + notifInterface = injector.getInstance(NotificationInterface.class); + + LOG.info("Created Atlas Hook for Falcon"); + } + + @Override + public void publish(final Data data) throws Exception { + final FalconEvent event = data.getEvent(); + if (sync) { + fireAndForget(event); + } else { + executor.submit(new Runnable() { + @Override + public void run() { + try { + fireAndForget(event); + } catch (Throwable e) { + LOG.info("Atlas hook failed", e); + } + } + }); + } + } + + private void fireAndForget(FalconEvent event) throws Exception { + LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation()); + + if (!typesRegistered) { + registerFalconDataModel(); + typesRegistered = true; + } + + notifyEntity(createEntities(event)); + } + + private List<Referenceable> createEntities(FalconEvent event) throws Exception { + switch (event.getOperation()) { + case ADD_PROCESS: + return createProcessInstance((Process) event.getEntity(), event.getUser(), event.getTimestamp()); + } + + return null; + } + + /** + * Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities. + * De-duping of entities is done on server side depending on the unique attribute on the + * + * @param entities entitiies to add + */ + private void notifyEntity(List<Referenceable> entities) { + int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3); + String message = entities.toString(); + + int numRetries = 0; + while (true) { + try { + notifInterface.send(NotificationInterface.NotificationType.HOOK, + new HookNotification.EntityCreateRequest(entities)); + return; + } catch (Exception e) { + numRetries++; + if (numRetries < maxRetries) { + LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e); + } else { + LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", message, + maxRetries, e); + break; + } + } + } + } + + + /** + + * Creates process entity + + * + + * @param event process entity event + + * @return process instance reference + + */ + public List<Referenceable> createProcessInstance(Process process, String user, long timestamp) throws Exception { + LOG.info("Creating process Instance : {}", process.getName()); + + // The requirement is for each cluster, create a process entity with name + // clustername.processname + List<Referenceable> entities = new ArrayList<>(); + + if (process.getClusters() != null) { + + for (Cluster processCluster : process.getClusters().getClusters()) { + org.apache.falcon.entity.v0.cluster.Cluster cluster = STORE.get(EntityType.CLUSTER, processCluster.getName()); + + List<Referenceable> inputs = new ArrayList<>(); + if (process.getInputs() != null) { + for (Input input : process.getInputs().getInputs()) { + List<Referenceable> clusterInputs = getInputOutputEntity(cluster, input.getFeed()); + entities.addAll(clusterInputs); + inputs.add(clusterInputs.get(clusterInputs.size() -1 )); + } + } + + List<Referenceable> outputs = new ArrayList<>(); + if (process.getOutputs() != null) { + for (Output output : process.getOutputs().getOutputs()) { + List<Referenceable> clusterOutputs = getInputOutputEntity(cluster, output.getFeed()); + entities.addAll(clusterOutputs); + outputs.add(clusterOutputs.get(clusterOutputs.size() -1 )); + } + } + + if (!inputs.isEmpty() || !outputs.isEmpty()) { + Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS_ENTITY.getName()); + processEntity.set(FalconDataModelGenerator.NAME, String.format("%s@%s", process.getName(), + cluster.getName())); + processEntity.set(FalconDataModelGenerator.PROCESS_NAME, process.getName()); + + processEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp); + if (!inputs.isEmpty()) { + processEntity.set(FalconDataModelGenerator.INPUTS, inputs); + } + if (!outputs.isEmpty()) { + processEntity.set(FalconDataModelGenerator.OUTPUTS, outputs); + } + processEntity.set(FalconDataModelGenerator.USER, user); + + if (StringUtils.isNotEmpty(process.getTags())) { + processEntity.set(FalconDataModelGenerator.TAGS, + EventUtil.convertKeyValueStringToMap(process.getTags())); + } + entities.add(processEntity); + } + + } + } + + return entities; + } + + private List<Referenceable> getInputOutputEntity(org.apache.falcon.entity.v0.cluster.Cluster cluster, String feedName) throws Exception { + Feed feed = STORE.get(EntityType.FEED, feedName); + org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName()); + + final CatalogTable table = getTable(feedCluster, feed); + if (table != null) { + CatalogStorage storage = new CatalogStorage(cluster, table); + return createHiveTableInstance(cluster.getName(), storage.getDatabase().toLowerCase(), + storage.getTable().toLowerCase()); + } + + return null; + } + + private CatalogTable getTable(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed) { + // check if table is overridden in cluster + if (cluster.getTable() != null) { + return cluster.getTable(); + } + + return feed.getTable(); + } + + private Referenceable createHiveDatabaseInstance(String clusterName, String dbName) + throws Exception { + Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); + dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName); + dbRef.set(HiveDataModelGenerator.NAME, dbName); + dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); + return dbRef; + } + + private List<Referenceable> createHiveTableInstance(String clusterName, String dbName, String tableName) throws Exception { + List<Referenceable> entities = new ArrayList<>(); + Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName); + entities.add(dbRef); + + Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); + tableRef.set(HiveDataModelGenerator.NAME, + HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); + tableRef.set(HiveDataModelGenerator.TABLE_NAME, tableName); + tableRef.set(HiveDataModelGenerator.DB, dbRef); + entities.add(tableRef); + + return entities; + } + + public synchronized void registerFalconDataModel() throws Exception { + if (isDataModelAlreadyRegistered()) { + LOG.info("Falcon data model is already registered!"); + return; + } + + HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasProperties, + UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser()); + hiveMetaStoreBridge.registerHiveDataModel(); + + FalconDataModelGenerator dataModelGenerator = new FalconDataModelGenerator(); + LOG.info("Registering Falcon data model"); + atlasClient.createType(dataModelGenerator.getModelAsJson()); + } + + private boolean isDataModelAlreadyRegistered() throws Exception { + try { + atlasClient.getType(FalconDataTypes.FALCON_PROCESS_ENTITY.getName()); + LOG.info("Hive data model is already registered!"); + return true; + } catch(AtlasServiceException ase) { + if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) { + return false; + } + throw ase; + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java new file mode 100644 index 0000000..ac9dd85 --- /dev/null +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.falcon.model; + +import com.google.common.collect.ImmutableList; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasException; +import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.json.TypesSerialization; +import org.apache.atlas.typesystem.types.AttributeDefinition; +import org.apache.atlas.typesystem.types.ClassType; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.EnumType; +import org.apache.atlas.typesystem.types.EnumTypeDefinition; +import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; +import org.apache.atlas.typesystem.types.Multiplicity; +import org.apache.atlas.typesystem.types.StructType; +import org.apache.atlas.typesystem.types.StructTypeDefinition; +import org.apache.atlas.typesystem.types.TraitType; +import org.apache.atlas.typesystem.types.utils.TypesUtil; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * Utility that generates falcon data model. + */ +public class FalconDataModelGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(FalconDataModelGenerator.class); + + private final Map<String, HierarchicalTypeDefinition<ClassType>> classTypeDefinitions; + private final Map<String, EnumTypeDefinition> enumTypeDefinitionMap; + private final Map<String, StructTypeDefinition> structTypeDefinitionMap; + + public static final String NAME = "name"; + public static final String PROCESS_NAME = "processName"; + public static final String TIMESTAMP = "timestamp"; + public static final String USER = "owned-by"; + public static final String TAGS = "tag-classification"; + + // multiple inputs and outputs for process + public static final String INPUTS = "inputs"; + public static final String OUTPUTS = "outputs"; + + + public FalconDataModelGenerator() { + classTypeDefinitions = new HashMap<>(); + enumTypeDefinitionMap = new HashMap<>(); + structTypeDefinitionMap = new HashMap<>(); + } + + public void createDataModel() throws AtlasException { + LOG.info("Generating the Falcon Data Model"); + createProcessEntityClass(); + + } + + private TypesDef getTypesDef() { + return TypesUtil.getTypesDef(getEnumTypeDefinitions(), getStructTypeDefinitions(), getTraitTypeDefinitions(), + getClassTypeDefinitions()); + } + + public String getDataModelAsJSON() { + return TypesSerialization.toJson(getTypesDef()); + } + + private ImmutableList<EnumTypeDefinition> getEnumTypeDefinitions() { + return ImmutableList.copyOf(enumTypeDefinitionMap.values()); + } + + private ImmutableList<StructTypeDefinition> getStructTypeDefinitions() { + return ImmutableList.copyOf(structTypeDefinitionMap.values()); + } + + private ImmutableList<HierarchicalTypeDefinition<ClassType>> getClassTypeDefinitions() { + return ImmutableList.copyOf(classTypeDefinitions.values()); + } + + private ImmutableList<HierarchicalTypeDefinition<TraitType>> getTraitTypeDefinitions() { + return ImmutableList.of(); + } + + + private void createProcessEntityClass() throws AtlasException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(PROCESS_NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, + null), + new AttributeDefinition(TIMESTAMP, DataTypes.LONG_TYPE.getName(), Multiplicity.REQUIRED, false, + null), + new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, + null), + // map of tags + new AttributeDefinition(TAGS, DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()), + Multiplicity.OPTIONAL, false, null),}; + + HierarchicalTypeDefinition<ClassType> definition = + new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), + ImmutableList.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions); + classTypeDefinitions.put(FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), definition); + LOG.debug("Created definition for {}", FalconDataTypes.FALCON_PROCESS_ENTITY.getName()); + } + + + + public String getModelAsJson() throws AtlasException { + createDataModel(); + return getDataModelAsJSON(); + } + + public static void main(String[] args) throws Exception { + FalconDataModelGenerator falconDataModelGenerator = new FalconDataModelGenerator(); + System.out.println("falconDataModelAsJSON = " + falconDataModelGenerator.getModelAsJson()); + + TypesDef typesDef = falconDataModelGenerator.getTypesDef(); + for (EnumTypeDefinition enumType : typesDef.enumTypesAsJavaList()) { + System.out.println(String.format("%s(%s) - values %s", enumType.name, EnumType.class.getSimpleName(), + Arrays.toString(enumType.enumValues))); + } + for (StructTypeDefinition structType : typesDef.structTypesAsJavaList()) { + System.out.println(String.format("%s(%s) - attributes %s", structType.typeName, StructType.class.getSimpleName(), + Arrays.toString(structType.attributeDefinitions))); + } + for (HierarchicalTypeDefinition<ClassType> classType : typesDef.classTypesAsJavaList()) { + System.out.println(String.format("%s(%s) - super types [%s] - attributes %s", classType.typeName, ClassType.class.getSimpleName(), + StringUtils.join(classType.superTypes, ","), Arrays.toString(classType.attributeDefinitions))); + } + for (HierarchicalTypeDefinition<TraitType> traitType : typesDef.traitTypesAsJavaList()) { + System.out.println(String.format("%s(%s) - %s", traitType.typeName, TraitType.class.getSimpleName(), + Arrays.toString(traitType.attributeDefinitions))); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java new file mode 100644 index 0000000..f1f350b --- /dev/null +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.falcon.model; + +/** + * Falcon Data Types for model and bridge. + */ +public enum FalconDataTypes { + + + FALCON_PROCESS_ENTITY("falcon_process"), + ; + + private final String name; + + FalconDataTypes(java.lang.String name) { + this.name = name; + } + + public String getName() { + return name; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java new file mode 100644 index 0000000..7f67407 --- /dev/null +++ b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.atlas.Util; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.security.CurrentUser; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Falcon event util + */ +public final class EventUtil { + private static final Logger LOG = LoggerFactory.getLogger(EventUtil.class); + + private EventUtil() {} + + + public static Map<String, String> convertKeyValueStringToMap(final String keyValueString) { + if (StringUtils.isBlank(keyValueString)) { + return null; + } + + Map<String, String> keyValueMap = new HashMap<>(); + + String[] tags = keyValueString.split(","); + for (String tag : tags) { + int index = tag.indexOf("="); + String tagKey = tag.substring(0, index); + String tagValue = tag.substring(index + 1, tag.length()); + keyValueMap.put(tagKey, tagValue); + } + return keyValueMap; + } + + + public static UserGroupInformation getUgi() throws FalconException { + UserGroupInformation ugi; + try { + ugi = CurrentUser.getAuthenticatedUGI(); + } catch (IOException ioe) { + throw new FalconException(ioe); + } + return ugi; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java new file mode 100644 index 0000000..e587e73 --- /dev/null +++ b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.atlas.event; + +import org.apache.falcon.entity.v0.Entity; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Falcon event to interface with Atlas Service. + */ +public class FalconEvent { + protected String user; + protected UserGroupInformation ugi; + protected OPERATION operation; + protected long timestamp; + protected Entity entity; + + public FalconEvent(String doAsUser, UserGroupInformation ugi, OPERATION falconOperation, long timestamp, Entity entity) { + this.user = doAsUser; + this.ugi = ugi; + this.operation = falconOperation; + this.timestamp = timestamp; + this.entity = entity; + } + + public enum OPERATION { + ADD_PROCESS, UPDATE_PROCESS + } + + public String getUser() { + return user; + } + + public UserGroupInformation getUgi() { + return ugi; + } + + public OPERATION getOperation() { + return operation; + } + + public long getTimestamp() { + return timestamp; + } + + public Entity getEntity() { + return entity; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java new file mode 100644 index 0000000..3522339 --- /dev/null +++ b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.atlas.publisher; + + +import org.apache.falcon.atlas.event.FalconEvent; + +/** + * Falcon publisher for Atlas + */ +public abstract class FalconEventPublisher { + public static class Data { + private FalconEvent event; + + public Data(FalconEvent event) { + this.event = event; + } + + public FalconEvent getEvent() { + return event; + } + } + + public abstract void publish(final Data data) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java new file mode 100644 index 0000000..373846d --- /dev/null +++ b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.atlas.service; + +import org.apache.atlas.falcon.hook.FalconHook; +import org.apache.falcon.FalconException; +import org.apache.falcon.atlas.Util.EventUtil; +import org.apache.falcon.atlas.event.FalconEvent; +import org.apache.falcon.atlas.publisher.FalconEventPublisher; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.service.ConfigurationChangeListener; +import org.apache.falcon.service.FalconService; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Atlas service to publish Falcon events + */ +public class AtlasService implements FalconService, ConfigurationChangeListener { + + private static final Logger LOG = LoggerFactory.getLogger(AtlasService.class); + private FalconEventPublisher publisher; + + /** + * Constant for the service name. + */ + public static final String SERVICE_NAME = AtlasService.class.getSimpleName(); + + @Override + public String getName() { + return SERVICE_NAME; + } + + @Override + public void init() throws FalconException { + ConfigurationStore.get().registerListener(this); + publisher = new FalconHook(); + } + + + @Override + public void destroy() throws FalconException { + ConfigurationStore.get().unregisterListener(this); + } + + @Override + public void onAdd(Entity entity) throws FalconException { + EntityType entityType = entity.getEntityType(); + switch (entityType) { + case PROCESS: + addProcessEntity((Process) entity, FalconEvent.OPERATION.ADD_PROCESS); + break; + + default: + LOG.debug("Entity type not processed " + entityType); + } + } + + @Override + public void onRemove(Entity entity) throws FalconException { + } + + @Override + public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { + EntityType entityType = newEntity.getEntityType(); + switch (entityType) { + case PROCESS: + addProcessEntity((Process) newEntity, FalconEvent.OPERATION.UPDATE_PROCESS); + break; + + default: + LOG.debug("Entity type not processed " + entityType); + } + } + + @Override + public void onReload(Entity entity) throws FalconException { + //Since there is no import script that can import existing falcon entities to atlas, adding on falcon service start + onAdd(entity); + } + + private void addProcessEntity(Process entity, FalconEvent.OPERATION operation) throws FalconException { + LOG.info("Adding process entity to Atlas: {}", entity.getName()); + + try { + String user = entity.getACL() != null ? entity.getACL().getOwner() : + UserGroupInformation.getLoginUser().getShortUserName(); + FalconEvent event = new FalconEvent(user, EventUtil.getUgi(), operation, System.currentTimeMillis(), entity); + FalconEventPublisher.Data data = new FalconEventPublisher.Data(event); + publisher.publish(data); + } catch (Exception ex) { + throw new FalconException("Unable to publish data to publisher " + ex.getMessage(), ex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java new file mode 100644 index 0000000..12b7a8b --- /dev/null +++ b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.falcon.hook; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.falcon.model.FalconDataTypes; +import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.falcon.atlas.service.AtlasService; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.security.CurrentUser; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import javax.xml.bind.JAXBException; +import java.util.List; + +import static org.testng.Assert.assertEquals; + +public class FalconHookIT { + public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(FalconHookIT.class); + + public static final String CLUSTER_RESOURCE = "/cluster.xml"; + public static final String FEED_RESOURCE = "/feed.xml"; + public static final String PROCESS_RESOURCE = "/process.xml"; + + private AtlasClient dgiCLient; + + private static final ConfigurationStore STORE = ConfigurationStore.get(); + + @BeforeClass + public void setUp() throws Exception { + dgiCLient = new AtlasClient(ApplicationProperties.get().getString("atlas.rest.address")); + + AtlasService service = new AtlasService(); + service.init(); + STORE.registerListener(service); + CurrentUser.authenticate(System.getProperty("user.name")); + } + + private <T extends Entity> T loadEntity(EntityType type, String resource, String name) throws JAXBException { + Entity entity = (Entity) type.getUnmarshaller().unmarshal(this.getClass().getResourceAsStream(resource)); + switch (entity.getEntityType()) { + case CLUSTER: + ((Cluster) entity).setName(name); + break; + + case FEED: + ((Feed) entity).setName(name); + break; + + case PROCESS: + ((org.apache.falcon.entity.v0.process.Process) entity).setName(name); + break; + } + return (T)entity; + } + + private String random() { + return RandomStringUtils.randomAlphanumeric(10); + } + + private String getTableUri(String dbName, String tableName) { + return String.format("catalog:%s:%s#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}", dbName, tableName); + } + + @Test (enabled = true) + public void testCreateProcess() throws Exception { + Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); + STORE.publish(EntityType.CLUSTER, cluster); + + Feed infeed = loadEntity(EntityType.FEED, FEED_RESOURCE, "feedin" + random()); + org.apache.falcon.entity.v0.feed.Cluster feedCluster = infeed.getClusters().getClusters().get(0); + feedCluster.setName(cluster.getName()); + String inTableName = "table" + random(); + String inDbName = "db" + random(); + feedCluster.getTable().setUri(getTableUri(inDbName, inTableName)); + STORE.publish(EntityType.FEED, infeed); + + Feed outfeed = loadEntity(EntityType.FEED, FEED_RESOURCE, "feedout" + random()); + feedCluster = outfeed.getClusters().getClusters().get(0); + feedCluster.setName(cluster.getName()); + String outTableName = "table" + random(); + String outDbName = "db" + random(); + feedCluster.getTable().setUri(getTableUri(outDbName, outTableName)); + STORE.publish(EntityType.FEED, outfeed); + + Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random()); + process.getClusters().getClusters().get(0).setName(cluster.getName()); + process.getInputs().getInputs().get(0).setFeed(infeed.getName()); + process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName()); + STORE.publish(EntityType.PROCESS, process); + + String pid = assertProcessIsRegistered(cluster.getName(), process.getName()); + Referenceable processEntity = dgiCLient.getEntity(pid); + assertEquals(processEntity.get("processName"), process.getName()); + + Id inId = (Id) ((List)processEntity.get("inputs")).get(0); + Referenceable inEntity = dgiCLient.getEntity(inId._getId()); + assertEquals(inEntity.get("name"), + HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), inDbName, inTableName)); + + Id outId = (Id) ((List)processEntity.get("outputs")).get(0); + Referenceable outEntity = dgiCLient.getEntity(outId._getId()); + assertEquals(outEntity.get("name"), + HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName)); + } + +// @Test (enabled = true, dependsOnMethods = "testCreateProcess") +// public void testUpdateProcess() throws Exception { +// FalconEvent event = createProcessEntity(PROCESS_NAME_2, INPUT, OUTPUT); +// FalconEventPublisher.Data data = new FalconEventPublisher.Data(event); +// hook.publish(data); +// String id = assertProcessIsRegistered(CLUSTER_NAME, PROCESS_NAME_2); +// event = createProcessEntity(PROCESS_NAME_2, INPUT_2, OUTPUT_2); +// hook.publish(data); +// String id2 = assertProcessIsRegistered(CLUSTER_NAME, PROCESS_NAME_2); +// if (!id.equals(id2)) { +// throw new Exception("Id mismatch"); +// } +// } + + private String assertProcessIsRegistered(String clusterName, String processName) throws Exception { + String name = processName + "@" + clusterName; + LOG.debug("Searching for process {}", name); + String query = String.format("%s as t where name = '%s' select t", + FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), name); + return assertEntityIsRegistered(query); + } + + private String assertEntityIsRegistered(final String query) throws Exception { + waitFor(20000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + JSONArray results = dgiCLient.search(query); + System.out.println(results); + return results.length() == 1; + } + }); + + JSONArray results = dgiCLient.search(query); + JSONObject row = results.getJSONObject(0).getJSONObject("t"); + + return row.getString("id"); + } + + + public interface Predicate { + + /** + * Perform a predicate evaluation. + * + * @return the boolean result of the evaluation. + * @throws Exception thrown if the predicate evaluation could not evaluate. + */ + boolean evaluate() throws Exception; + } + + /** + * Wait for a condition, expressed via a {@link Predicate} to become true. + * + * @param timeout maximum time in milliseconds to wait for the predicate to become true. + * @param predicate predicate waiting on. + */ + protected void waitFor(int timeout, Predicate predicate) throws Exception { + long mustEnd = System.currentTimeMillis() + timeout; + + boolean eval; + while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) { + LOG.info("Waiting up to {} msec", mustEnd - System.currentTimeMillis()); + Thread.sleep(1000); + } + if (!eval) { + throw new Exception("Waiting timed out after " + timeout + " msec"); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/test/resources/cluster.xml ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/test/resources/cluster.xml b/addons/falcon-bridge/src/test/resources/cluster.xml new file mode 100644 index 0000000..b183847 --- /dev/null +++ b/addons/falcon-bridge/src/test/resources/cluster.xml @@ -0,0 +1,45 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!-- + Primary cluster configuration for demo vm + --> +<cluster colo="west-coast" description="Primary Cluster" name="testcluster" + xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + + <interfaces> + <interface type="readonly" endpoint="hftp://localhost:10070" version="1.1.1" /> + + <interface type="write" endpoint="hdfs://localhost:10020" version="1.1.1" /> + + <interface type="execute" endpoint="localhost:10300" version="1.1.1" /> + + <interface type="workflow" endpoint="http://localhost:11010/oozie/" version="3.3.0" /> + + <interface type="registry" endpoint="thrift://localhost:19083" version="0.11.0" /> + + <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.4.3" /> + </interfaces> + + <locations> + <location name="staging" path="/apps/falcon/staging" /> + <location name="temp" path="/tmp" /> + <location name="working" path="/apps/falcon/working" /> + </locations> + +</cluster> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/test/resources/feed.xml ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/test/resources/feed.xml b/addons/falcon-bridge/src/test/resources/feed.xml new file mode 100644 index 0000000..473c745 --- /dev/null +++ b/addons/falcon-bridge/src/test/resources/feed.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<feed description="test input" name="testinput" xmlns="uri:falcon:feed:0.1"> + <groups>online,bi</groups> + + <frequency>hours(1)</frequency> + <timezone>UTC</timezone> + <late-arrival cut-off="hours(3)"/> + + <clusters> + <cluster name="testcluster" type="source"> + <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/> + <retention limit="hours(24)" action="delete"/> + <table uri="catalog:indb:intable#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> + </cluster> + </clusters> + + <table uri="catalog:indb:unused#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> + + <ACL owner="testuser" group="group" permission="0x755"/> + <schema location="hcat" provider="hcat"/> +</feed> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/test/resources/hive-site.xml ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/test/resources/hive-site.xml b/addons/falcon-bridge/src/test/resources/hive-site.xml new file mode 100644 index 0000000..b106903 --- /dev/null +++ b/addons/falcon-bridge/src/test/resources/hive-site.xml @@ -0,0 +1,53 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration> + <property> + <name>hive.exec.post.hooks</name> + <value>org.apache.atlas.hive.hook.HiveHook</value> + </property> + + <property> + <name>hive.support.concurrency</name> + <value>false</value> + </property> + + <property> + <name>hive.metastore.warehouse.dir</name> + <value>${user.dir}/target/metastore</value> + </property> + + <property> + <name>javax.jdo.option.ConnectionURL</name> + <value>jdbc:derby:${user.dir}/target/metastore_db;create=true</value> + </property> + + <property> + <name>atlas.hook.hive.synchronous</name> + <value>true</value> + </property> + + <property> + <name>atlas.cluster.name</name> + <value>test</value> + </property> + + <property> + <name>fs.pfile.impl</name> + <value>org.apache.hadoop.fs.ProxyLocalFileSystem</value> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/test/resources/process.xml ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/test/resources/process.xml b/addons/falcon-bridge/src/test/resources/process.xml new file mode 100644 index 0000000..b94d0a8 --- /dev/null +++ b/addons/falcon-bridge/src/test/resources/process.xml @@ -0,0 +1,53 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<process name="testprocess" xmlns="uri:falcon:process:0.1"> + <tags>[email protected], [email protected], department=forecasting</tags> + + <clusters> + <cluster name="testcluster"> + <validity end="2012-04-22T00:00Z" start="2012-04-21T00:00Z"/> + </cluster> + </clusters> + + <parallel>1</parallel> + <order>FIFO</order> + <frequency>days(1)</frequency> + <timezone>UTC</timezone> + + <inputs> + <input end="today(0,0)" start="today(0,0)" feed="testinput" name="input"/> + </inputs> + + <outputs> + <output instance="now(0,0)" feed="testoutput" name="output"/> + </outputs> + + <properties> + <property name="blah" value="blah"/> + </properties> + + <workflow engine="hive" path="/falcon/test/apps/hive/script.hql"/> + + <retry policy="periodic" delay="minutes(10)" attempts="3"/> + + <late-process policy="exp-backoff" delay="hours(2)"> + <late-input input="input" workflow-path="/falcon/test/workflow"/> + </late-process> +</process> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/test/resources/startup.properties ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/test/resources/startup.properties b/addons/falcon-bridge/src/test/resources/startup.properties new file mode 100644 index 0000000..2d0dba1 --- /dev/null +++ b/addons/falcon-bridge/src/test/resources/startup.properties @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +*.domain=debug +*.config.store.persist=false \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index ee5ae10..40babe5 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -18,6 +18,7 @@ package org.apache.atlas.hive.bridge; +import com.sun.jersey.api.client.ClientResponse; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasServiceException; @@ -487,9 +488,11 @@ public class HiveMetaStoreBridge { dgiClient.getType(HiveDataTypes.HIVE_PROCESS.getName()); LOG.info("Hive data model is already registered!"); } catch(AtlasServiceException ase) { - //Expected in case types do not exist - LOG.info("Registering Hive data model"); - dgiClient.createType(dataModelGenerator.getModelAsJson()); + if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) { + //Expected in case types do not exist + LOG.info("Registering Hive data model"); + dgiClient.createType(dataModelGenerator.getModelAsJson()); + } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java index 994c813..1eb2acf 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java @@ -227,7 +227,7 @@ public class HiveDataModelGenerator { null), new AttributeDefinition("description", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("locationUri", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, + new AttributeDefinition("locationUri", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null), new AttributeDefinition("ownerName", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
