Repository: atlas Updated Branches: refs/heads/master f11c18016 -> 801aea9a8
ATLAS-2586: added import-kafka bridge to import Kafka topic into Atlas Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/801aea9a Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/801aea9a Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/801aea9a Branch: refs/heads/master Commit: 801aea9a8173a4856ce7f812d6790475d99a3b94 Parents: f11c180 Author: rmani <[email protected]> Authored: Thu Apr 19 22:56:21 2018 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Fri Apr 20 16:29:24 2018 -0700 ---------------------------------------------------------------------- addons/kafka-bridge/pom.xml | 399 +++++++++++++++++++ addons/kafka-bridge/src/bin/import-kafka.sh | 145 +++++++ .../apache/atlas/kafka/bridge/KafkaBridge.java | 350 ++++++++++++++++ .../atlas/kafka/model/KafkaDataTypes.java | 31 ++ .../main/resources/atlas-kafka-import-log4j.xml | 55 +++ .../patches/006-kafka_topic_add_attribute.json | 21 + distro/pom.xml | 1 + .../assemblies/atlas-kafka-hook-package.xml | 57 +++ .../src/main/assemblies/standalone-package.xml | 13 +- pom.xml | 6 + 10 files changed, 1077 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/801aea9a/addons/kafka-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/kafka-bridge/pom.xml b/addons/kafka-bridge/pom.xml new file mode 100644 index 0000000..52f90d6 --- /dev/null +++ b/addons/kafka-bridge/pom.xml @@ -0,0 +1,399 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>apache-atlas</artifactId> + <groupId>org.apache.atlas</groupId> + <version>1.0.0-SNAPSHOT</version> + <relativePath>../../</relativePath> + </parent> + <artifactId>kafka-bridge</artifactId> + <description>Apache Atlas Kafka Bridge Module</description> + <name>Apache Atlas Kafka Bridge</name> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client-v1</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client-v2</artifactId> + </dependency> + + <!-- to bring up atlas server for integration tests --> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-bundle</artifactId> + <version>1.19</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-webapp</artifactId> + <type>war</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + <version>4.4.6</version> + </dependency> + + <dependency> + <groupId>com.101tec</groupId> + <artifactId>zkclient</artifactId> + <version>${zkclient.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${kafka.scala.binary.version}</artifactId> + <version>${kafka.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-webapp</artifactId> + <version>${jetty.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>12.0.1</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <version>${hadoop.version}</version> + <scope>compile</scope> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>dist</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-hook</id> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/dependency/hook/kafka/atlas-kafka-plugin-impl</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <artifactItems> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>${project.artifactId}</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-client-v1</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-client-common</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-client-v2</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-intg</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-common</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${kafka.scala.binary.version}</artifactId> + <version>${kafka.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </artifactItem> + <artifactItem> + <groupId>com.sun.jersey.contribs</groupId> + <artifactId>jersey-multipart</artifactId> + <version>${jersey.version}</version> + </artifactItem> + <artifactItem> + <groupId>commons-configuration</groupId> + <artifactId>commons-configuration</artifactId> + <version>${commons-conf.version}</version> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + <build> + <plugins> + <plugin> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-maven-plugin</artifactId> + <configuration> + <skip>${skipTests}</skip> + <!--only skip int tests --> + <httpConnector> + <port>31000</port> + <idleTimeout>60000</idleTimeout> + </httpConnector> + <war>../../webapp/target/atlas-webapp-${project.version}.war</war> + <daemon>true</daemon> + <webApp> + <contextPath>/</contextPath> + <descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor> + <extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath> + </webApp> + <useTestScope>true</useTestScope> + <systemProperties> + <force>true</force> + <systemProperty> + <name>atlas.home</name> + <value>${project.build.directory}</value> + </systemProperty> + <systemProperty> + <key>atlas.conf</key> + <value>${project.build.directory}/test-classes</value> + </systemProperty> + <systemProperty> + <name>atlas.data</name> + <value>${project.build.directory}/data</value> + </systemProperty> + <systemProperty> + <name>atlas.log.dir</name> + <value>${project.build.directory}/logs</value> + </systemProperty> + <systemProperty> + <name>atlas.log.file</name> + <value>application.log</value> + </systemProperty> + <systemProperty> + <name>log4j.configuration</name> + <value>file:///${project.build.directory}/test-classes/atlas-log4j.xml</value> + </systemProperty> + <systemProperty> + <name>atlas.graphdb.backend</name> + <value>${graphdb.backend.impl}</value> + </systemProperty> + <systemProperty> + <key>embedded.solr.directory</key> + <value>${project.build.directory}</value> + </systemProperty> + </systemProperties> + <stopKey>atlas-stop</stopKey> + <stopPort>31001</stopPort> + <stopWait>${jetty-maven-plugin.stopWait}</stopWait> + </configuration> + <executions> + <execution> + <id>start-jetty</id> + <phase>pre-integration-test</phase> + <goals> + <goal>deploy-war</goal> + </goals> + <configuration> + <daemon>true</daemon> + </configuration> + </execution> + <execution> + <id>stop-jetty</id> + <phase>post-integration-test</phase> + <goals> + <goal>stop</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-site-plugin</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.maven.doxia</groupId> + <artifactId>doxia-module-twiki</artifactId> + <version>${doxia.version}</version> + </dependency> + <dependency> + <groupId>org.apache.maven.doxia</groupId> + <artifactId>doxia-core</artifactId> + <version>${doxia.version}</version> + </dependency> + </dependencies> + <executions> + <execution> + <goals> + <goal>site</goal> + </goals> + <phase>prepare-package</phase> + </execution> + </executions> + <configuration> + <generateProjectInfo>false</generateProjectInfo> + <generateReports>false</generateReports> + </configuration> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2.1</version> + <inherited>false</inherited> + <executions> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-resources-plugin</artifactId> + <executions> + <execution> + <id>copy-resources</id> + <phase>validate</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${basedir}/target/models</outputDirectory> + <resources> + <resource> + <directory>${basedir}/../models</directory> + <filtering>true</filtering> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <executions> + <execution> + <id>copy-solr-resources</id> + <phase>validate</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/solr</outputDirectory> + <resources> + <resource> + <directory>${basedir}/../../test-tools/src/main/resources/solr</directory> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/atlas/blob/801aea9a/addons/kafka-bridge/src/bin/import-kafka.sh ---------------------------------------------------------------------- diff --git a/addons/kafka-bridge/src/bin/import-kafka.sh b/addons/kafka-bridge/src/bin/import-kafka.sh new file mode 100644 index 0000000..33b4652 --- /dev/null +++ b/addons/kafka-bridge/src/bin/import-kafka.sh @@ -0,0 +1,145 @@ +#!/bin/bash +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. See accompanying LICENSE file. +# + +# resolve links - $0 may be a softlink +PRG="${0}" + +[[ `uname -s` == *"CYGWIN"* ]] && CYGWIN=true + +while [ -h "${PRG}" ]; do + ls=`ls -ld "${PRG}"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "${PRG}"`/"$link" + fi +done + +echo ">>>>> $PRG" + +BASEDIR=`dirname ${PRG}` +BASEDIR=`cd ${BASEDIR}/..;pwd` + +echo ">>>>> $BASEDIR" + +allargs=$@ + +if test -z "${JAVA_HOME}" +then + JAVA_BIN=`which java` + JAR_BIN=`which jar` +else + JAVA_BIN="${JAVA_HOME}/bin/java" + JAR_BIN="${JAVA_HOME}/bin/jar" +fi +export JAVA_BIN + +if [ ! -e "${JAVA_BIN}" ] || [ ! -e "${JAR_BIN}" ]; then + echo "$JAVA_BIN and/or $JAR_BIN not found on the system. Please make sure java and jar commands are available." + exit 1 +fi + +# Construct Atlas classpath using jars from hook/kafka/atlas-kafka-plugin-impl/ directory. +for i in "${BASEDIR}/hook/kafka/atlas-kafka-plugin-impl/"*.jar; do + ATLASCPPATH="${ATLASCPPATH}:$i" +done + +ATLAS_CONF_DIR=/etc/atlas/conf +ATLASCPPATH=${ATLASCPPATH}:${ATLAS_CONF_DIR} + +# log dir for applications +ATLAS_LOG_DIR="${ATLAS_LOG_DIR:-$BASEDIR/logs}" +export ATLAS_LOG_DIR +LOGFILE="$ATLAS_LOG_DIR/import-kafka.log" + +TIME=`date +%Y%m%d%H%M%s` + +#Add Kafka conf in classpath +if [ ! -z "$KAFKA_CONF_DIR" ]; then + KAFKA_CONF=$KAFKA_CONF_DIR +elif [ ! -z "$KAFKA_HOME" ]; then + KAFKA_CONF="$KAFKA_HOME/conf" +elif [ -e /etc/kafka/conf ]; then + KAFKA_CONF="/etc/kafka/conf" +else + echo "Could not find a valid KAFKA configuration" + exit 1 +fi + +echo Using Kafka configuration directory "[$KAFKA_CONF]" + + +if [ -f "${KAFKA_CONF}/kafka-env.sh" ]; then + . "${KAFKA_CONF}/kafka-env.sh" +fi + +if [ -z "$KAFKA_HOME" ]; then + if [ -d "${BASEDIR}/../kafka" ]; then + KAFKA_HOME=${BASEDIR}/../kafka + else + echo "Please set KAFKA_HOME to the root of Kafka installation" + exit 1 + fi +fi + +KAFKA_CP="${KAFKA_CONF}" + +for i in "${KAFKA_HOME}/libs/"*.jar; do + KAFKA_CP="${KAFKA_CP}:$i" +done + + +#Add hadoop conf in classpath +if [ ! -z "$HADOOP_CLASSPATH" ]; then + HADOOP_CP=$HADOOP_CLASSPATH +elif [ ! -z "$HADOOP_HOME" ]; then + HADOOP_CP=`$HADOOP_HOME/bin/hadoop classpath` +elif [ $(command -v hadoop) ]; then + HADOOP_CP=`hadoop classpath` + #echo $HADOOP_CP +else + echo "Environment variable HADOOP_CLASSPATH or HADOOP_HOME need to be set" + exit 1 +fi + +CP="${KAFKA_CP}:${ATLASCPPATH}:${HADOOP_CP}" + +# If running in cygwin, convert pathnames and classpath to Windows format. +if [ "${CYGWIN}" == "true" ] +then + ATLAS_LOG_DIR=`cygpath -w ${ATLAS_LOG_DIR}` + LOGFILE=`cygpath -w ${LOGFILE}` + KAFKA_CP=`cygpath -w ${KAFKA_CP}` + HADOOP_CP=`cygpath -w ${HADOOP_CP}` + CP=`cygpath -w -p ${CP}` +fi + +JAVA_PROPERTIES="$ATLAS_OPTS -Datlas.log.dir=$ATLAS_LOG_DIR -Datlas.log.file=import-kafka.log +-Dlog4j.configuration=atlas-kafka-import-log4j.xml" +shift + +while [[ ${1} =~ ^\-D ]]; do + JAVA_PROPERTIES="${JAVA_PROPERTIES} ${1}" + shift +done + +echo "Log file for import is $LOGFILE" + +"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.kafka.bridge.KafkaBridge $allargs + +RETVAL=$? +[ $RETVAL -eq 0 ] && echo Kafka Data Model imported successfully!!! +[ $RETVAL -ne 0 ] && echo Failed to import Kafka Data Model!!! \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/801aea9a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java ---------------------------------------------------------------------- diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java new file mode 100644 index 0000000..da6e7b9 --- /dev/null +++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java @@ -0,0 +1,350 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.kafka.bridge; + +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.kafka.common.security.JaasUtils; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.kafka.model.KafkaDataTypes; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.utils.AuthenticationUtil; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.security.UserGroupInformation; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class KafkaBridge { + private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class); + + private static final int EXIT_CODE_SUCCESS = 0; + private static final int EXIT_CODE_FAILED = 1; + private static final String ATLAS_ENDPOINT = "atlas.rest.address"; + private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; + private static final String KAFKA_CLUSTER_NAME = "atlas.cluster.name"; + private static final String DEFAULT_CLUSTER_NAME = "primary"; + private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + private static final String DESCRIPTION_ATTR = "description"; + private static final String PARTITION_COUNT = "partitionCount"; + private static final String NAME = "name"; + private static final String URI = "uri"; + private static final String CLUSTERNAME = "clusterName"; + private static final String TOPIC = "topic"; + + private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s"; + private static final String ZOOKEEPER_CONNECT = "atlas.kafka.zookeeper.connect"; + private static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS = "atlas.kafka.zookeeper.connection.timeout.ms"; + private static final String ZOOKEEPER_SESSION_TIMEOUT_MS = "atlas.kafka.zookeeper.session.timeout.ms"; + private static final String DEFAULT_ZOOKEEPER_CONNECT = "localhost:2181"; + private static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS = 10 * 1000; + private static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10 * 1000; + + private final List<String> availableTopics; + private final String clusterName; + private final AtlasClientV2 atlasClientV2; + private final ZkUtils zkUtils; + + + public static void main(String[] args) { + int exitCode = EXIT_CODE_FAILED; + + try { + Options options = new Options(); + options.addOption("t","topic", true, "topic"); + options.addOption("f", "filename", true, "filename"); + + CommandLineParser parser = new BasicParser(); + CommandLine cmd = parser.parse(options, args); + String topicToImport = cmd.getOptionValue("t"); + String fileToImport = cmd.getOptionValue("f"); + Configuration atlasConf = ApplicationProperties.get(); + String[] urls = atlasConf.getStringArray(ATLAS_ENDPOINT); + + if (urls == null || urls.length == 0) { + urls = new String[] { DEFAULT_ATLAS_URL }; + } + + final AtlasClientV2 atlasClientV2; + + if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { + String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput(); + + atlasClientV2 = new AtlasClientV2(urls, basicAuthUsernamePassword); + } else { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls); + } + + KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2); + + if (StringUtils.isNotEmpty(fileToImport)) { + File f = new File(fileToImport); + + if (f.exists() && f.canRead()) { + BufferedReader br = new BufferedReader(new FileReader(f)); + String line = null; + + while((line = br.readLine()) != null) { + topicToImport = line.trim(); + + importer.importTopic(topicToImport); + } + + exitCode = EXIT_CODE_SUCCESS; + } else { + LOG.error("Failed to read the file"); + } + } else { + importer.importTopic(topicToImport); + + exitCode = EXIT_CODE_SUCCESS; + } + } catch(ParseException e) { + LOG.error("Failed to parse arguments. Error: ", e.getMessage()); + printUsage(); + } catch(Exception e) { + System.out.println("ImportKafkaEntities failed. Please check the log file for the detailed error message"); + e.printStackTrace(); + LOG.error("ImportKafkaEntities failed", e); + } + + System.exit(exitCode); + } + + public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2) throws Exception { + String zookeeperConnect = getZKConnection(atlasConf); + int sessionTimeOutMs = atlasConf.getInt(ZOOKEEPER_SESSION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS) ; + int connectionTimeOutMs = atlasConf.getInt(ZOOKEEPER_CONNECTION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS); + ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeOutMs, connectionTimeOutMs, ZKStringSerializer$.MODULE$); + + this.atlasClientV2 = atlasClientV2; + this.clusterName = atlasConf.getString(KAFKA_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); + this.zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled()); + this.availableTopics = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()); + } + + public void importTopic(String topicToImport) throws Exception { + List<String> topics = availableTopics; + + if (StringUtils.isNotEmpty(topicToImport)) { + List<String> topics_subset = new ArrayList<>(); + for(String topic : topics) { + if (topic.startsWith(topicToImport)) { + topics_subset.add(topic); + } + } + topics = topics_subset; + } + + if (CollectionUtils.isNotEmpty(topics)) { + for(String topic : topics) { + createOrUpdateTopic(topic); + } + } + } + + protected AtlasEntityWithExtInfo createOrUpdateTopic(String topic) throws Exception { + String topicQualifiedName = getTopicQualifiedName(clusterName, topic); + AtlasEntityWithExtInfo topicEntity = findTopicEntityInAtlas(topicQualifiedName); + + if (topicEntity == null) { + System.out.println("Adding Kafka topic " + topic); + LOG.info("Importing Kafka topic: {}", topicQualifiedName); + + AtlasEntity entity = getTopicEntity(topic, null); + + topicEntity = createEntityInAtlas(new AtlasEntityWithExtInfo(entity)); + } else { + System.out.println("Updating Kafka topic " + topic); + LOG.info("Kafka topic {} already exists in Atlas. Updating it..", topicQualifiedName); + + AtlasEntity entity = getTopicEntity(topic, topicEntity.getEntity()); + + topicEntity.setEntity(entity); + + topicEntity = updateEntityInAtlas(topicEntity); + } + + return topicEntity; + } + + protected AtlasEntity getTopicEntity(String topic, AtlasEntity topicEntity) { + final AtlasEntity ret; + + if (topicEntity == null) { + ret = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName()); + } else { + ret = topicEntity; + } + + String qualifiedName = getTopicQualifiedName(clusterName, topic); + + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName); + ret.setAttribute(CLUSTERNAME, clusterName); + ret.setAttribute(TOPIC, topic); + ret.setAttribute(NAME,topic); + ret.setAttribute(DESCRIPTION_ATTR, topic); + ret.setAttribute(URI, topic); + ret.setAttribute(PARTITION_COUNT, (Integer) zkUtils.getTopicPartitionCount(topic).get()); + + return ret; + } + + protected static String getTopicQualifiedName(String clusterName, String topic) { + return String.format(FORMAT_KAKFA_TOPIC_QUALIFIED_NAME, topic.toLowerCase(), clusterName); + } + + private AtlasEntityWithExtInfo findTopicEntityInAtlas(String topicQualifiedName) { + AtlasEntityWithExtInfo ret = null; + + try { + ret = findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(), topicQualifiedName); + clearRelationshipAttributes(ret); + } catch (Exception e) { + ret = null; // entity doesn't exist in Atlas + } + + return ret; + } + + private AtlasEntityWithExtInfo findEntityInAtlas(String typeName, String qualifiedName) throws Exception { + Map<String, String> attributes = Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName); + + return atlasClientV2.getEntityByAttribute(typeName, attributes); + } + + private AtlasEntityWithExtInfo createEntityInAtlas(AtlasEntityWithExtInfo entity) throws Exception { + AtlasEntityWithExtInfo ret = null; + EntityMutationResponse response = atlasClientV2.createEntity(entity); + List<AtlasEntityHeader> entities = response.getCreatedEntities(); + + if (CollectionUtils.isNotEmpty(entities)) { + AtlasEntityWithExtInfo getByGuidResponse = atlasClientV2.getEntityByGuid(entities.get(0).getGuid()); + + ret = getByGuidResponse; + + LOG.info("Created {} entity: name={}, guid={}", ret.getEntity().getTypeName(), ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), ret.getEntity().getGuid()); + } + + return ret; + } + + private AtlasEntityWithExtInfo updateEntityInAtlas(AtlasEntityWithExtInfo entity) throws Exception { + AtlasEntityWithExtInfo ret = null; + EntityMutationResponse response = atlasClientV2.updateEntity(entity); + + if (response != null) { + List<AtlasEntityHeader> entities = response.getUpdatedEntities(); + + if (CollectionUtils.isNotEmpty(entities)) { + AtlasEntityWithExtInfo getByGuidResponse = atlasClientV2.getEntityByGuid(entities.get(0).getGuid()); + + ret = getByGuidResponse; + + LOG.info("Updated {} entity: name={}, guid={} ", ret.getEntity().getTypeName(), ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), ret.getEntity().getGuid()); + } else { + LOG.info("Entity: name={} ", entity.toString() + " not updated as it is unchanged from what is in Atlas" ); + + ret = entity; + } + } else { + LOG.info("Entity: name={} ", entity.toString() + " not updated as it is unchanged from what is in Atlas" ); + + ret = entity; + } + + return ret; + } + + private static void printUsage(){ + System.out.println("Usage 1: import-kafka.sh"); + System.out.println("Usage 2: import-kafka.sh [-n <topic regex> OR --topic <topic regex >]"); + System.out.println("Usage 3: import-kafka.sh [-f <filename>]" ); + System.out.println(" Format:"); + System.out.println(" topic1"); + System.out.println(" topic2"); + System.out.println(" topic3"); + } + + + private void clearRelationshipAttributes(AtlasEntityWithExtInfo entity) { + if (entity != null) { + clearRelationshipAttributes(entity.getEntity()); + + if (entity.getReferredEntities() != null) { + clearRelationshipAttributes(entity.getReferredEntities().values()); + } + } + } + + private void clearRelationshipAttributes(Collection<AtlasEntity> entities) { + if (entities != null) { + for (AtlasEntity entity : entities) { + clearRelationshipAttributes(entity); + } + } + } + + private void clearRelationshipAttributes(AtlasEntity entity) { + if (entity != null && entity.getRelationshipAttributes() != null) { + entity.getRelationshipAttributes().clear(); + } + } + + private String getStringValue(String[] vals) { + String ret = null; + for(String val:vals) { + ret = (ret == null) ? val : ret + "," + val; + } + return ret; + } + + private String getZKConnection(Configuration atlasConf) { + String ret = null; + ret = getStringValue(atlasConf.getStringArray(ZOOKEEPER_CONNECT)); + if (StringUtils.isEmpty(ret) ) { + ret = DEFAULT_ZOOKEEPER_CONNECT; + } + return ret; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/801aea9a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/model/KafkaDataTypes.java ---------------------------------------------------------------------- diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/model/KafkaDataTypes.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/model/KafkaDataTypes.java new file mode 100644 index 0000000..0f81b4c --- /dev/null +++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/model/KafkaDataTypes.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.kafka.model; + +/** + * HBASE Data Types for model and bridge. + */ +public enum KafkaDataTypes { + // Classes + KAFKA_TOPIC; + + public String getName() { + return name().toLowerCase(); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/801aea9a/addons/kafka-bridge/src/main/resources/atlas-kafka-import-log4j.xml ---------------------------------------------------------------------- diff --git a/addons/kafka-bridge/src/main/resources/atlas-kafka-import-log4j.xml b/addons/kafka-bridge/src/main/resources/atlas-kafka-import-log4j.xml new file mode 100644 index 0000000..3fc2dcf --- /dev/null +++ b/addons/kafka-bridge/src/main/resources/atlas-kafka-import-log4j.xml @@ -0,0 +1,55 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> + +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="console" class="org.apache.log4j.ConsoleAppender"> + <param name="Target" value="System.out"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/> + </layout> + </appender> + + <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender"> + <param name="File" value="${atlas.log.dir}/${atlas.log.file}"/> + <param name="Append" value="true"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/> + </layout> + </appender> + + + <logger name="org.apache.atlas" additivity="false"> + <level value="info"/> + <appender-ref ref="FILE"/> + </logger> + + <!-- to avoid logs - The configuration log.flush.interval.messages = 1 was supplied but isn't a known config --> + <logger name="org.apache.kafka.common.config.AbstractConfig" additivity="false"> + <level value="error"/> + <appender-ref ref="FILE"/> + </logger> + + <root> + <priority value="info"/> + <appender-ref ref="FILE"/> + </root> + +</log4j:configuration> http://git-wip-us.apache.org/repos/asf/atlas/blob/801aea9a/addons/models/1000-Hadoop/patches/006-kafka_topic_add_attribute.json ---------------------------------------------------------------------- diff --git a/addons/models/1000-Hadoop/patches/006-kafka_topic_add_attribute.json b/addons/models/1000-Hadoop/patches/006-kafka_topic_add_attribute.json new file mode 100644 index 0000000..6e1c9bc --- /dev/null +++ b/addons/models/1000-Hadoop/patches/006-kafka_topic_add_attribute.json @@ -0,0 +1,21 @@ +{ + "patches": [ + { + "action": "ADD_ATTRIBUTE", + "typeName": "kafka_topic", + "applyToVersion": "1.0", + "updateToVersion": "1.1", + "params": null, + "attributeDefs": [ + { + "name": "partitionCount", + "typeName": "int", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": true, + "isUnique": false + } + ] + } + ] +} http://git-wip-us.apache.org/repos/asf/atlas/blob/801aea9a/distro/pom.xml ---------------------------------------------------------------------- diff --git a/distro/pom.xml b/distro/pom.xml index 1f4c6d5..9f6f2bd 100644 --- a/distro/pom.xml +++ b/distro/pom.xml @@ -116,6 +116,7 @@ atlas.graph.index.search.solr.wait-searcher=true <descriptor>src/main/assemblies/atlas-falcon-hook-package.xml</descriptor> <descriptor>src/main/assemblies/atlas-sqoop-hook-package.xml</descriptor> <descriptor>src/main/assemblies/atlas-storm-hook-package.xml</descriptor> + <descriptor>src/main/assemblies/atlas-kafka-hook-package.xml</descriptor> <descriptor>src/main/assemblies/standalone-package.xml</descriptor> <descriptor>src/main/assemblies/src-package.xml</descriptor> <descriptor>src/main/assemblies/migration-exporter.xml</descriptor> http://git-wip-us.apache.org/repos/asf/atlas/blob/801aea9a/distro/src/main/assemblies/atlas-kafka-hook-package.xml ---------------------------------------------------------------------- diff --git a/distro/src/main/assemblies/atlas-kafka-hook-package.xml b/distro/src/main/assemblies/atlas-kafka-hook-package.xml new file mode 100644 index 0000000..8394ea8 --- /dev/null +++ b/distro/src/main/assemblies/atlas-kafka-hook-package.xml @@ -0,0 +1,57 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <formats> + <format>tar.gz</format> + <format>dir</format> + </formats> + <id>kafka-hook</id> + <baseDirectory>apache-atlas-kafka-hook-${project.version}</baseDirectory> + <fileSets> + <fileSet> + <directory>target/bin</directory> + <outputDirectory>hook-bin</outputDirectory> + <includes> + <include>import-kafka.sh</include> + </includes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <!-- addons/kafka --> + <fileSet> + <directory>../addons/kafka-bridge/src/bin</directory> + <outputDirectory>hook-bin</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <!-- addons/hbase --> + <fileSet> + <directory>../addons/kafka-bridge/target/dependency/bridge</directory> + <outputDirectory>bridge</outputDirectory> + </fileSet> + <fileSet> + <directory>../addons/kafka-bridge/target/dependency/hook</directory> + <outputDirectory>hook</outputDirectory> + </fileSet> + </fileSets> +</assembly> http://git-wip-us.apache.org/repos/asf/atlas/blob/801aea9a/distro/src/main/assemblies/standalone-package.xml ---------------------------------------------------------------------- diff --git a/distro/src/main/assemblies/standalone-package.xml b/distro/src/main/assemblies/standalone-package.xml index dc2a66b..dc88add 100755 --- a/distro/src/main/assemblies/standalone-package.xml +++ b/distro/src/main/assemblies/standalone-package.xml @@ -176,7 +176,18 @@ <!-- for kafka topic setup --> <fileSet> - <directory>../notification/target/dependency/hook</directory> + <directory>../addons/kafka-bridge/src/bin</directory> + <outputDirectory>hook-bin</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>../addons/kakfa-bridge/target/dependency/bridge</directory> + <outputDirectory>bridge</outputDirectory> + </fileSet> + <fileSet> + <directory>../addons/kakfa-bridge/target/dependency/hook</directory> <outputDirectory>hook</outputDirectory> </fileSet> </fileSets> http://git-wip-us.apache.org/repos/asf/atlas/blob/801aea9a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9c9d746..1b94aa7 100644 --- a/pom.xml +++ b/pom.xml @@ -767,6 +767,7 @@ <module>addons/storm-bridge</module> <module>addons/hbase-bridge-shim</module> <module>addons/hbase-bridge</module> + <module>addons/kafka-bridge</module> <module>tools/atlas-migration-exporter</module> <module>distro</module> @@ -1492,6 +1493,11 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>kafka-bridge</artifactId> + <version>${project.version}</version> + </dependency> <!-- API documentation --> <dependency>
