ATLAS-2491: Hive hook should use v2 notifications Signed-off-by: Madhan Neethiraj <[email protected]> (cherry picked from commit 6e02ec5b3a97ee2bfaf16ef5e875c14e383d5823)
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/5273ab69 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/5273ab69 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/5273ab69 Branch: refs/heads/branch-0.8 Commit: 5273ab69dfb3a731b1101a4e264056bc583f2782 Parents: 93d7b35 Author: rmani <[email protected]> Authored: Mon Mar 19 15:56:01 2018 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Wed Mar 21 10:01:41 2018 -0700 ---------------------------------------------------------------------- .../apache/atlas/falcon/hook/FalconHookIT.java | 2 +- addons/hive-bridge/pom.xml | 6 + .../atlas/hive/bridge/ColumnLineageUtils.java | 158 -- .../atlas/hive/bridge/HiveMetaStoreBridge.java | 1005 ++++++------ .../atlas/hive/hook/AtlasHiveHookContext.java | 91 ++ .../org/apache/atlas/hive/hook/HiveHook.java | 1211 ++------------- .../atlas/hive/hook/events/AlterDatabase.java | 42 + .../atlas/hive/hook/events/AlterTable.java | 42 + .../hive/hook/events/AlterTableRename.java | 173 +++ .../hive/hook/events/AlterTableRenameCol.java | 100 ++ .../atlas/hive/hook/events/BaseHiveEvent.java | 822 ++++++++++ .../atlas/hive/hook/events/CreateDatabase.java | 75 + .../hive/hook/events/CreateHiveProcess.java | 196 +++ .../atlas/hive/hook/events/CreateTable.java | 93 ++ .../atlas/hive/hook/events/DropDatabase.java | 68 + .../atlas/hive/hook/events/DropTable.java | 61 + .../apache/atlas/hive/rewrite/ASTRewriter.java | 26 - .../atlas/hive/rewrite/HiveASTRewriter.java | 95 -- .../atlas/hive/rewrite/LiteralRewriter.java | 76 - .../atlas/hive/rewrite/RewriteContext.java | 48 - .../atlas/hive/rewrite/RewriteException.java | 24 - .../java/org/apache/atlas/hive/HiveITBase.java | 556 ++++++- .../atlas/hive/bridge/ColumnLineageUtils.java | 159 ++ .../hive/bridge/HiveLiteralRewriterTest.java | 68 - .../hive/bridge/HiveMetaStoreBridgeTest.java | 189 ++- .../hive/bridge/HiveMetastoreBridgeIT.java | 53 +- .../org/apache/atlas/hive/hook/HiveHookIT.java | 1465 ++++++++++-------- .../apache/atlas/sqoop/hook/SqoopHookIT.java | 2 +- .../atlas/catalog/EntityResourceProvider.java | 2 +- .../catalog/EntityTagResourceProvider.java | 2 +- .../atlas/catalog/TaxonomyResourceProvider.java | 4 +- pom.xml | 4 +- .../atlas/repository/graph/DeleteHandler.java | 2 +- .../graph/v1/AtlasEntityChangeNotifier.java | 42 +- .../store/graph/v1/DeleteHandlerV1.java | 36 +- .../graph/v1/AtlasDeleteHandlerV1Test.java | 44 +- .../NotificationEntityChangeListener.java | 38 +- .../org/apache/atlas/examples/QuickStartIT.java | 2 +- .../web/filters/ActiveServerFilterTest.java | 2 +- .../atlas/web/integration/BaseResourceIT.java | 7 - .../web/integration/TypesJerseyResourceIT.java | 2 +- 41 files changed, 4241 insertions(+), 2852 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java index 7212921..e2d4bbb 100644 --- a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java +++ b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java @@ -304,7 +304,7 @@ public class FalconHookIT { } private String assertEntityIsRegistered(final String typeName, final String property, final String value) throws Exception { - waitFor(1000, new Predicate() { + waitFor(80000, new Predicate() { @Override public void evaluate() throws Exception { Referenceable entity = atlasClient.getEntity(typeName, property, value); http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml index 8876351..3b251cd 100755 --- a/addons/hive-bridge/pom.xml +++ b/addons/hive-bridge/pom.xml @@ -105,6 +105,12 @@ <dependency> <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client-v2</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> <artifactId>atlas-notification</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java deleted file mode 100644 index 663fcdc..0000000 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.hive.bridge; - -import org.apache.atlas.AtlasClient; -import org.apache.atlas.hive.model.HiveDataTypes; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.hadoop.hive.ql.hooks.LineageInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class ColumnLineageUtils { - public static final Logger LOG = LoggerFactory.getLogger(ColumnLineageUtils.class); - public static class HiveColumnLineageInfo { - public final String depenendencyType; - public final String expr; - public final String inputColumn; - - HiveColumnLineageInfo(LineageInfo.Dependency d, String inputCol) { - depenendencyType = d.getType().name(); - expr = d.getExpr(); - inputColumn = inputCol; - } - - @Override - public String toString(){ - return inputColumn; - } - } - - public static String getQualifiedName(LineageInfo.DependencyKey key){ - String db = key.getDataContainer().getTable().getDbName(); - String table = key.getDataContainer().getTable().getTableName(); - String col = key.getFieldSchema().getName(); - return db + "." + table + "." + col; - } - - public static Map<String, List<HiveColumnLineageInfo>> buildLineageMap(LineageInfo lInfo) { - Map<String, List<HiveColumnLineageInfo>> m = new HashMap<>(); - - for (Map.Entry<LineageInfo.DependencyKey, LineageInfo.Dependency> e : lInfo.entrySet()) { - List<HiveColumnLineageInfo> l = new ArrayList<>(); - String k = getQualifiedName(e.getKey()); - - if (LOG.isDebugEnabled()) { - LOG.debug("buildLineageMap(): key={}; value={}", e.getKey(), e.getValue()); - } - - Collection<LineageInfo.BaseColumnInfo> baseCols = getBaseCols(e.getValue()); - - if (baseCols != null) { - for (LineageInfo.BaseColumnInfo iCol : baseCols) { - String db = iCol.getTabAlias().getTable().getDbName(); - String table = iCol.getTabAlias().getTable().getTableName(); - String colQualifiedName = iCol.getColumn() == null ? db + "." + table : db + "." + table + "." + iCol.getColumn().getName(); - l.add(new HiveColumnLineageInfo(e.getValue(), colQualifiedName)); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Setting lineage --> Input: {} ==> Output : {}", l, k); - } - m.put(k, l); - } - } - return m; - } - - static Collection<LineageInfo.BaseColumnInfo> getBaseCols(LineageInfo.Dependency lInfoDep) { - Collection<LineageInfo.BaseColumnInfo> ret = null; - - if (lInfoDep != null) { - try { - Method getBaseColsMethod = lInfoDep.getClass().getMethod("getBaseCols"); - - Object retGetBaseCols = getBaseColsMethod.invoke(lInfoDep); - - if (retGetBaseCols != null) { - if (retGetBaseCols instanceof Collection) { - ret = (Collection) retGetBaseCols; - } else { - LOG.warn("{}: unexpected return type from LineageInfo.Dependency.getBaseCols(), expected type {}", - retGetBaseCols.getClass().getName(), "Collection"); - } - } - } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) { - LOG.warn("getBaseCols()", ex); - } - } - - return ret; - } - - static String[] extractComponents(String qualifiedName) { - String[] comps = qualifiedName.split("\\."); - int lastIdx = comps.length - 1; - int atLoc = comps[lastIdx].indexOf('@'); - if (atLoc > 0) { - comps[lastIdx] = comps[lastIdx].substring(0, atLoc); - } - return comps; - } - - static void populateColumnReferenceableMap(Map<String, Referenceable> m, - Referenceable r) { - if (r.getTypeName().equals(HiveDataTypes.HIVE_TABLE.getName())) { - String qName = (String) r.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); - String[] qNameComps = extractComponents(qName); - for (Referenceable col : (List<Referenceable>) r.get(HiveMetaStoreBridge.COLUMNS)) { - String cName = (String) col.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); - String[] colQNameComps = extractComponents(cName); - String colQName = colQNameComps[0] + "." + colQNameComps[1] + "." + colQNameComps[2]; - m.put(colQName, col); - } - String tableQName = qNameComps[0] + "." + qNameComps[1]; - m.put(tableQName, r); - } - } - - - public static Map<String, Referenceable> buildColumnReferenceableMap(List<Referenceable> inputs, - List<Referenceable> outputs) { - Map<String, Referenceable> m = new HashMap<>(); - - for (Referenceable r : inputs) { - populateColumnReferenceableMap(m, r); - } - - for (Referenceable r : outputs) { - populateColumnReferenceableMap(m, r); - } - - return m; - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index 6016f6d..637c101 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -21,23 +21,30 @@ package org.apache.atlas.hive.bridge; import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.ClientResponse; import org.apache.atlas.ApplicationProperties; -import org.apache.atlas.AtlasClient; -import org.apache.atlas.AtlasConstants; +import org.apache.atlas.AtlasClientV2; import org.apache.atlas.AtlasServiceException; -import org.apache.atlas.hive.hook.HiveHook; +import org.apache.atlas.hive.hook.events.BaseHiveEvent; import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hook.AtlasHookException; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.Struct; -import org.apache.atlas.typesystem.json.InstanceSerialization; -import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.instance.EntityMutations; import org.apache.atlas.utils.AuthenticationUtil; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.commons.collections.CollectionUtils; + import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Options; +import org.apache.commons.collections.MapUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; @@ -55,115 +62,221 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Date; +import java.util.Collections; import java.util.List; -import static org.apache.atlas.hive.hook.HiveHook.CONF_PREFIX; +import java.util.Map; + +import static org.apache.atlas.hive.hook.events.BaseHiveEvent.*; /** * A Bridge Utility that imports metadata from the Hive Meta Store * and registers them in Atlas. */ -public class HiveMetaStoreBridge { - private static final String DEFAULT_DGI_URL = "http://localhost:21000/"; - public static final String HIVE_CLUSTER_NAME = "atlas.cluster.name"; - public static final String DEFAULT_CLUSTER_NAME = "primary"; - public static final String DESCRIPTION_ATTR = "description"; - - public static final String TEMP_TABLE_PREFIX = "_temp-"; - - private final String clusterName; - public static final long MILLIS_CONVERT_FACTOR = 1000; - - public static final String ATLAS_ENDPOINT = "atlas.rest.address"; - - public static final String COMMENT = "comment"; - public static final String PARAMETERS = "parameters"; - public static final String COLUMNS = "columns"; - public static final String POSITION = "position"; - public static final String PART_COLS = "partitionKeys"; - public static final String TABLE_ALIAS_LIST = "aliases"; - public static final String STORAGE_NUM_BUCKETS = "numBuckets"; - public static final String STORAGE_IS_STORED_AS_SUB_DIRS = "storedAsSubDirectories"; - public static final String TABLE = "table"; - public static final String DB = "db"; - public static final String STORAGE_DESC = "sd"; - public static final String STORAGE_DESC_INPUT_FMT = "inputFormat"; - public static final String STORAGE_DESC_OUTPUT_FMT = "outputFormat"; - public static final String LOCATION = "location"; - public static final String TABLE_TYPE_ATTR = "tableType"; - public static final String CREATE_TIME = "createTime"; - public static final String LAST_ACCESS_TIME = "lastAccessTime"; - public static final String HDFS_PATH = "hdfs_path"; +public class HiveMetaStoreBridge { private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class); - public final Hive hiveClient; - private final AtlasClient atlasClient; - private final boolean convertHdfsPathToLowerCase; + public static final String CONF_PREFIX = "atlas.hook.hive."; + public static final String HIVE_CLUSTER_NAME = "atlas.cluster.name"; + public static final String DEFAULT_CLUSTER_NAME = "primary"; + public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase"; + public static final String TEMP_TABLE_PREFIX = "_temp-"; + public static final String ATLAS_ENDPOINT = "atlas.rest.address"; + public static final String SEP = ":".intern(); + public static final String HDFS_PATH = "hdfs_path"; + public static final String DB = "db"; - HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) { - this(clusterName, hiveClient, atlasClient, true); - } + private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; - HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient, boolean convertHdfsPathToLowerCase) { - this.clusterName = clusterName; - this.hiveClient = hiveClient; - this.atlasClient = atlasClient; - this.convertHdfsPathToLowerCase = convertHdfsPathToLowerCase; - } + private final String clusterName; + private final Hive hiveClient; + private final AtlasClientV2 atlasClientV2; + private final boolean convertHdfsPathToLowerCase; - public String getClusterName() { - return clusterName; + + public static void main(String[] args) throws AtlasHookException { + try { + Configuration atlasConf = ApplicationProperties.get(); + String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT); + + if (atlasEndpoint == null || atlasEndpoint.length == 0){ + atlasEndpoint = new String[] { DEFAULT_ATLAS_URL }; + } + + AtlasClientV2 atlasClientV2; + + if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { + String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput(); + + atlasClientV2 = new AtlasClientV2(atlasEndpoint, basicAuthUsernamePassword); + } else { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint); + } + + Options options = new Options(); + CommandLineParser parser = new BasicParser(); + CommandLine cmd = parser.parse(options, args); + boolean failOnError = cmd.hasOption("failOnError"); + + HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2); + + hiveMetaStoreBridge.importHiveMetadata(failOnError); + } catch(Exception e) { + throw new AtlasHookException("HiveMetaStoreBridge.main() failed.", e); + } } /** * Construct a HiveMetaStoreBridge. * @param hiveConf {@link HiveConf} for Hive component in the cluster */ - public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf) throws Exception { - this(atlasProperties, hiveConf, null); + public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf, AtlasClientV2 atlasClientV2) throws Exception { + this(atlasProperties.getString(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), atlasClientV2, atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, true)); } /** * Construct a HiveMetaStoreBridge. * @param hiveConf {@link HiveConf} for Hive component in the cluster */ - public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf, AtlasClient atlasClient) throws Exception { - this(atlasProperties.getString(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), atlasClient, atlasProperties.getBoolean(CONF_PREFIX + "hdfs_path.convert_to_lowercase", true)); + public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf) throws Exception { + this(atlasProperties, hiveConf, null); } - AtlasClient getAtlasClient() { - return atlasClient; + HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClientV2 atlasClientV2) { + this(clusterName, hiveClient, atlasClientV2, true); + } + + HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClientV2 atlasClientV2, boolean convertHdfsPathToLowerCase) { + this.clusterName = clusterName; + this.hiveClient = hiveClient; + this.atlasClientV2 = atlasClientV2; + this.convertHdfsPathToLowerCase = convertHdfsPathToLowerCase; + } + + public String getClusterName() { + return clusterName; + } + + public Hive getHiveClient() { + return hiveClient; + } + + public AtlasClientV2 getAtlasClient() { + return atlasClientV2; } public boolean isConvertHdfsPathToLowerCase() { return convertHdfsPathToLowerCase; } - void importHiveMetadata(boolean failOnError) throws Exception { - LOG.info("Importing hive metadata"); + + @VisibleForTesting + public void importHiveMetadata(boolean failOnError) throws Exception { + LOG.info("Importing Hive metadata"); + importDatabases(failOnError); } private void importDatabases(boolean failOnError) throws Exception { List<String> databases = hiveClient.getAllDatabases(); + + LOG.info("Found {} databases", databases.size()); + for (String databaseName : databases) { - Referenceable dbReference = registerDatabase(databaseName); + AtlasEntityWithExtInfo dbEntity = registerDatabase(databaseName); - if (dbReference != null) { - importTables(dbReference, databaseName, failOnError); + if (dbEntity != null) { + importTables(dbEntity.getEntity(), databaseName, failOnError); } } } /** - * Create a Hive Database entity - * @param hiveDB The Hive {@link Database} object from which to map properties - * @return new Hive Database entity - * @throws HiveException + * Imports all tables for the given db + * @param dbEntity + * @param databaseName + * @param failOnError + * @throws Exception */ - public Referenceable createDBInstance(Database hiveDB) throws HiveException { - return createOrUpdateDBInstance(hiveDB, null); + private int importTables(AtlasEntity dbEntity, String databaseName, final boolean failOnError) throws Exception { + List<String> hiveTables = hiveClient.getAllTables(databaseName); + + LOG.info("Found {} tables in database {}", hiveTables.size(), databaseName); + + int tablesImported = 0; + + try { + for (String tableName : hiveTables) { + int imported = importTable(dbEntity, databaseName, tableName, failOnError); + + tablesImported += imported; + } + } finally { + if (tablesImported == hiveTables.size()) { + LOG.info("Successfully imported all {} tables from database {}", tablesImported, databaseName); + } else { + LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import", tablesImported, hiveTables.size(), databaseName); + } + } + + return tablesImported; + } + + @VisibleForTesting + public int importTable(AtlasEntity dbEntity, String databaseName, String tableName, final boolean failOnError) throws Exception { + try { + Table table = hiveClient.getTable(databaseName, tableName); + AtlasEntityWithExtInfo tableEntity = registerTable(dbEntity, table); + + if (table.getTableType() == TableType.EXTERNAL_TABLE) { + String processQualifiedName = getTableProcessQualifiedName(clusterName, table); + AtlasEntityWithExtInfo processEntity = findProcessEntity(processQualifiedName); + + if (processEntity == null) { + String tableLocation = isConvertHdfsPathToLowerCase() ? lower(table.getDataLocation().toString()) : table.getDataLocation().toString(); + String query = getCreateTableString(table, tableLocation); + AtlasEntity pathInst = toHdfsPathEntity(tableLocation); + AtlasEntity tableInst = tableEntity.getEntity(); + AtlasEntity processInst = new AtlasEntity(HiveDataTypes.HIVE_PROCESS.getName()); + long now = System.currentTimeMillis(); + + processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName); + processInst.setAttribute(ATTRIBUTE_NAME, query); + processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName); + processInst.setAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(BaseHiveEvent.getObjectId(pathInst))); + processInst.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(BaseHiveEvent.getObjectId(tableInst))); + processInst.setAttribute(ATTRIBUTE_USER_NAME, table.getOwner()); + processInst.setAttribute(ATTRIBUTE_START_TIME, now); + processInst.setAttribute(ATTRIBUTE_END_TIME, now); + processInst.setAttribute(ATTRIBUTE_OPERATION_TYPE, "CREATETABLE"); + processInst.setAttribute(ATTRIBUTE_QUERY_TEXT, query); + processInst.setAttribute(ATTRIBUTE_QUERY_ID, query); + processInst.setAttribute(ATTRIBUTE_QUERY_PLAN, "{}"); + processInst.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(query)); + + AtlasEntitiesWithExtInfo createTableProcess = new AtlasEntitiesWithExtInfo(); + + createTableProcess.addEntity(processInst); + createTableProcess.addEntity(pathInst); + + registerInstances(createTableProcess); + } else { + LOG.info("Process {} is already registered", processQualifiedName); + } + } + + return 1; + } catch (Exception e) { + LOG.error("Import failed for hive_table {}", tableName, e); + + if (failOnError) { + throw e; + } + + return 0; + } } /** @@ -172,493 +285,501 @@ public class HiveMetaStoreBridge { * @return * @throws Exception */ - private Referenceable registerDatabase(String databaseName) throws Exception { - Referenceable dbRef = getDatabaseReference(clusterName, databaseName); - Database db = hiveClient.getDatabase(databaseName); + private AtlasEntityWithExtInfo registerDatabase(String databaseName) throws Exception { + AtlasEntityWithExtInfo ret = null; + Database db = hiveClient.getDatabase(databaseName); if (db != null) { - if (dbRef == null) { - dbRef = createDBInstance(db); - dbRef = registerInstance(dbRef); + ret = findDatabase(clusterName, databaseName); + + if (ret == null) { + ret = registerInstance(new AtlasEntityWithExtInfo(toDbEntity(db))); } else { - LOG.info("Database {} is already registered with id {}. Updating it.", databaseName, dbRef.getId().id); - dbRef = createOrUpdateDBInstance(db, dbRef); - updateInstance(dbRef); + LOG.info("Database {} is already registered - id={}. Updating it.", databaseName, ret.getEntity().getGuid()); + + ret.setEntity(toDbEntity(db, ret.getEntity())); + + updateInstance(ret); } } - return dbRef; + + return ret; } - private Referenceable createOrUpdateDBInstance(Database hiveDB, Referenceable dbRef) { - LOG.info("Importing objects from databaseName : {}", hiveDB.getName()); + private AtlasEntityWithExtInfo registerTable(AtlasEntity dbEntity, Table table) throws AtlasHookException { + try { + AtlasEntityWithExtInfo ret; + AtlasEntityWithExtInfo tableEntity = findTableEntity(table); - if (dbRef == null) { - dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); - } - String dbName = hiveDB.getName().toLowerCase(); - dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getDBQualifiedName(clusterName, dbName)); - dbRef.set(AtlasClient.NAME, dbName); - dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); - dbRef.set(DESCRIPTION_ATTR, hiveDB.getDescription()); - dbRef.set(LOCATION, hiveDB.getLocationUri()); - dbRef.set(PARAMETERS, hiveDB.getParameters()); - dbRef.set(AtlasClient.OWNER, hiveDB.getOwnerName()); - if (hiveDB.getOwnerType() != null) { - dbRef.set("ownerType", hiveDB.getOwnerType().getValue()); + if (tableEntity == null) { + tableEntity = toTableEntity(dbEntity, table); + + tableEntity.addReferredEntity(dbEntity); + + ret = registerInstance(tableEntity); + } else { + LOG.info("Table {}.{} is already registered with id {}. Updating entity.", table.getDbName(), table.getTableName(), tableEntity.getEntity().getGuid()); + + ret = toTableEntity(dbEntity, table, tableEntity); + + ret.addReferredEntity(dbEntity); + + updateInstance(ret); + } + + return ret; + } catch (Exception e) { + throw new AtlasHookException("HiveMetaStoreBridge.registerTable() failed.", e); } - return dbRef; } /** * Registers an entity in atlas - * @param referenceable + * @param entity * @return * @throws Exception */ - private Referenceable registerInstance(Referenceable referenceable) throws Exception { - String typeName = referenceable.getTypeName(); - LOG.debug("creating instance of type {}", typeName); + private AtlasEntityWithExtInfo registerInstance(AtlasEntityWithExtInfo entity) throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("creating {} entity: {}", entity.getEntity().getTypeName(), entity); + } - String entityJSON = InstanceSerialization.toJson(referenceable, true); - LOG.debug("Submitting new entity {} = {}", referenceable.getTypeName(), entityJSON); - List<String> guids = getAtlasClient().createEntity(entityJSON); - LOG.debug("created instance for type {}, guid: {}", typeName, guids); + AtlasEntityWithExtInfo ret = null; + EntityMutationResponse response = atlasClientV2.createEntity(entity); + List<AtlasEntityHeader> createdEntities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE); - return new Referenceable(guids.get(guids.size() - 1), referenceable.getTypeName(), null); - } + if (CollectionUtils.isNotEmpty(createdEntities)) { + for (AtlasEntityHeader createdEntity : createdEntities) { + if (ret == null) { + ret = atlasClientV2.getEntityByGuid(createdEntity.getGuid()); - /** - * Gets reference to the atlas entity for the database - * @param databaseName database Name - * @param clusterName cluster name - * @return Reference for database if exists, else null - * @throws Exception - */ - private Referenceable getDatabaseReference(String clusterName, String databaseName) throws Exception { - LOG.debug("Getting reference for database {}", databaseName); - String typeName = HiveDataTypes.HIVE_DB.getName(); + LOG.info("Created {} entity: name={}, guid={}", ret.getEntity().getTypeName(), ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), ret.getEntity().getGuid()); + } else if (ret.getEntity(createdEntity.getGuid()) == null) { + AtlasEntityWithExtInfo newEntity = atlasClientV2.getEntityByGuid(createdEntity.getGuid()); - return getEntityReference(typeName, getDBQualifiedName(clusterName, databaseName)); - } + ret.addReferredEntity(newEntity.getEntity()); - /** - * Construct the qualified name used to uniquely identify a Database instance in Atlas. - * @param clusterName Name of the cluster to which the Hive component belongs - * @param dbName Name of the Hive database - * @return Unique qualified name to identify the Database instance in Atlas. - */ - public static String getDBQualifiedName(String clusterName, String dbName) { - return String.format("%s@%s", dbName.toLowerCase(), clusterName); - } + if (MapUtils.isNotEmpty(newEntity.getReferredEntities())) { + for (Map.Entry<String, AtlasEntity> entry : newEntity.getReferredEntities().entrySet()) { + ret.addReferredEntity(entry.getKey(), entry.getValue()); + } + } - private String getCreateTableString(Table table, String location){ - String colString = ""; - List<FieldSchema> colList = table.getAllCols(); - if ( colList != null) { - for (FieldSchema col : colList) { - colString += col.getName() + " " + col.getType() + ","; - } - if (colList.size() > 0) { - colString = colString.substring(0, colString.length() - 1); - colString = "(" + colString + ")"; + LOG.info("Created {} entity: name={}, guid={}", newEntity.getEntity().getTypeName(), newEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), newEntity.getEntity().getGuid()); + } } } - String query = "create external table " + table.getTableName() + colString + - " location '" + location + "'"; - return query; + + return ret; } /** - * Imports all tables for the given db - * @param databaseReferenceable - * @param databaseName - * @param failOnError + * Registers an entity in atlas + * @param entities + * @return * @throws Exception */ - private int importTables(Referenceable databaseReferenceable, String databaseName, final boolean failOnError) throws Exception { - int tablesImported = 0; - List<String> hiveTables = hiveClient.getAllTables(databaseName); - LOG.info("Importing tables {} for db {}", hiveTables.toString(), databaseName); - for (String tableName : hiveTables) { - int imported = importTable(databaseReferenceable, databaseName, tableName, failOnError); - tablesImported += imported; + private AtlasEntitiesWithExtInfo registerInstances(AtlasEntitiesWithExtInfo entities) throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("creating {} entities: {}", entities.getEntities().size(), entities); } - if (tablesImported == hiveTables.size()) { - LOG.info("Successfully imported all {} tables from {} ", tablesImported, databaseName); - } else { - LOG.error("Able to import {} tables out of {} tables from {}. Please check logs for import errors", tablesImported, hiveTables.size(), databaseName); - } + AtlasEntitiesWithExtInfo ret = null; + EntityMutationResponse response = atlasClientV2.createEntities(entities); + List<AtlasEntityHeader> createdEntities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE); - return tablesImported; - } + if (CollectionUtils.isNotEmpty(createdEntities)) { + ret = new AtlasEntitiesWithExtInfo(); - @VisibleForTesting - public int importTable(Referenceable databaseReferenceable, String databaseName, String tableName, final boolean failOnError) throws Exception { - try { - Table table = hiveClient.getTable(databaseName, tableName); - Referenceable tableReferenceable = registerTable(databaseReferenceable, table); - if (table.getTableType() == TableType.EXTERNAL_TABLE) { - String tableQualifiedName = getTableProcessQualifiedName(clusterName, table); - Referenceable process = getProcessReference(tableQualifiedName); - if (process == null) { - LOG.info("Attempting to register create table process for {}", tableQualifiedName); - Referenceable lineageProcess = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); - ArrayList<Referenceable> sourceList = new ArrayList<>(); - ArrayList<Referenceable> targetList = new ArrayList<>(); - String tableLocation = isConvertHdfsPathToLowerCase() ? HiveHook.lower(table.getDataLocation().toString()) : table.getDataLocation().toString(); - Referenceable path = fillHDFSDataSet(tableLocation); - String query = getCreateTableString(table, tableLocation); - sourceList.add(path); - targetList.add(tableReferenceable); - lineageProcess.set("inputs", sourceList); - lineageProcess.set("outputs", targetList); - lineageProcess.set("userName", table.getOwner()); - lineageProcess.set("startTime", new Date(System.currentTimeMillis())); - lineageProcess.set("endTime", new Date(System.currentTimeMillis())); - lineageProcess.set("operationType", "CREATETABLE"); - lineageProcess.set("queryText", query); - lineageProcess.set("queryId", query); - lineageProcess.set("queryPlan", "{}"); - lineageProcess.set("clusterName", clusterName); - List<String> recentQueries = new ArrayList<>(1); - recentQueries.add(query); - lineageProcess.set("recentQueries", recentQueries); - String processQualifiedName = getTableProcessQualifiedName(clusterName, table); - lineageProcess.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQualifiedName); - lineageProcess.set(AtlasClient.NAME, query); - registerInstance(lineageProcess); - } else { - LOG.info("Process {} is already registered", process.toString()); + for (AtlasEntityHeader createdEntity : createdEntities) { + AtlasEntityWithExtInfo entity = atlasClientV2.getEntityByGuid(createdEntity.getGuid()); + + ret.addEntity(entity.getEntity()); + + if (MapUtils.isNotEmpty(entity.getReferredEntities())) { + for (Map.Entry<String, AtlasEntity> entry : entity.getReferredEntities().entrySet()) { + ret.addReferredEntity(entry.getKey(), entry.getValue()); + } } + + LOG.info("Created {} entity: name={}, guid={}", entity.getEntity().getTypeName(), entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getEntity().getGuid()); } - return 1; - } catch (Exception e) { - LOG.error("Import failed for hive_table {} ", tableName, e); - if (failOnError) { - throw e; - } - return 0; } - } - - /** - * Gets reference for the table - * - * @param hiveTable - * @return table reference if exists, else null - * @throws Exception - */ - private Referenceable getTableReference(Table hiveTable) throws Exception { - LOG.debug("Getting reference for table {}.{}", hiveTable.getDbName(), hiveTable.getTableName()); - String typeName = HiveDataTypes.HIVE_TABLE.getName(); - String tblQualifiedName = getTableQualifiedName(getClusterName(), hiveTable.getDbName(), hiveTable.getTableName()); - return getEntityReference(typeName, tblQualifiedName); + return ret; } - private Referenceable getEntityReference(final String typeName, final String tblQualifiedName) throws AtlasServiceException { - AtlasClient dgiClient = getAtlasClient(); - try { - return dgiClient.getEntity(typeName, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tblQualifiedName); - } catch (AtlasServiceException e) { - if(e.getStatus() == ClientResponse.Status.NOT_FOUND) { - return null; - } - throw e; + private void updateInstance(AtlasEntityWithExtInfo entity) throws AtlasServiceException { + if (LOG.isDebugEnabled()) { + LOG.debug("updating {} entity: {}", entity.getEntity().getTypeName(), entity); } - } - private Referenceable getProcessReference(String qualifiedName) throws Exception{ - LOG.debug("Getting reference for process {}", qualifiedName); - String typeName = HiveDataTypes.HIVE_PROCESS.getName(); - return getEntityReference(typeName, qualifiedName); + atlasClientV2.updateEntity(entity); + + LOG.info("Updated {} entity: name={}, guid={}", entity.getEntity().getTypeName(), entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getEntity().getGuid()); } /** - * Construct the qualified name used to uniquely identify a Table instance in Atlas. - * @param clusterName Name of the cluster to which the Hive component belongs - * @param dbName Name of the Hive database to which the Table belongs - * @param tableName Name of the Hive table - * @return Unique qualified name to identify the Table instance in Atlas. + * Create a Hive Database entity + * @param hiveDB The Hive {@link Database} object from which to map properties + * @return new Hive Database AtlasEntity + * @throws HiveException */ - public static String getTableQualifiedName(String clusterName, String dbName, String tableName, boolean isTemporaryTable) { - String tableTempName = tableName; - if (isTemporaryTable) { - if (SessionState.get() != null && SessionState.get().getSessionId() != null) { - tableTempName = tableName + TEMP_TABLE_PREFIX + SessionState.get().getSessionId(); - } else { - tableTempName = tableName + TEMP_TABLE_PREFIX + RandomStringUtils.random(10); - } - } - return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), clusterName); + private AtlasEntity toDbEntity(Database hiveDB) throws HiveException { + return toDbEntity(hiveDB, null); } + private AtlasEntity toDbEntity(Database hiveDB, AtlasEntity dbEntity) { + if (dbEntity == null) { + dbEntity = new AtlasEntity(HiveDataTypes.HIVE_DB.getName()); + } + String dbName = hiveDB.getName().toLowerCase(); - /** - * Construct the qualified name used to uniquely identify a Table instance in Atlas. - * @param clusterName Name of the cluster to which the Hive component belongs - * @param table hive table for which the qualified name is needed - * @return Unique qualified name to identify the Table instance in Atlas. - */ - public static String getTableQualifiedName(String clusterName, Table table) { - return getTableQualifiedName(clusterName, table.getDbName(), table.getTableName(), table.isTemporary()); - } + dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getDBQualifiedName(clusterName, dbName)); + dbEntity.setAttribute(ATTRIBUTE_NAME, dbName); + dbEntity.setAttribute(ATTRIBUTE_DESCRIPTION, hiveDB.getDescription()); + dbEntity.setAttribute(ATTRIBUTE_OWNER, hiveDB.getOwnerName()); - public static String getTableProcessQualifiedName(String clusterName, Table table) { - String tableQualifiedName = getTableQualifiedName(clusterName, table); - Date createdTime = getTableCreatedTime(table); - return tableQualifiedName + HiveHook.SEP + createdTime.getTime(); - } + dbEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName); + dbEntity.setAttribute(ATTRIBUTE_LOCATION, hiveDB.getLocationUri()); + dbEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveDB.getParameters()); - /** - * Construct the qualified name used to uniquely identify a Table instance in Atlas. - * @param clusterName Name of the cluster to which the Hive component belongs - * @param dbName Name of the Hive database to which the Table belongs - * @param tableName Name of the Hive table - * @return Unique qualified name to identify the Table instance in Atlas. - */ - public static String getTableQualifiedName(String clusterName, String dbName, String tableName) { - return getTableQualifiedName(clusterName, dbName, tableName, false); - } + if (hiveDB.getOwnerType() != null) { + dbEntity.setAttribute(ATTRIBUTE_OWNER_TYPE,OWNER_TYPE_TO_ENUM_VALUE.get(hiveDB.getOwnerType().getValue())); + } + return dbEntity; + } /** * Create a new table instance in Atlas - * @param dbReference reference to a created Hive database {@link Referenceable} to which this table belongs + * @param database AtlasEntity for Hive {@link AtlasEntity} to which this table belongs * @param hiveTable reference to the Hive {@link Table} from which to map properties - * @return Newly created Hive reference + * @return Newly created Hive AtlasEntity * @throws Exception */ - public Referenceable createTableInstance(Referenceable dbReference, Table hiveTable) - throws AtlasHookException { - return createOrUpdateTableInstance(dbReference, null, hiveTable); + private AtlasEntityWithExtInfo toTableEntity(AtlasEntity database, Table hiveTable) throws AtlasHookException { + return toTableEntity(database, hiveTable, null); } - public static Date getTableCreatedTime(Table table) { - return new Date(table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR); - } + private AtlasEntityWithExtInfo toTableEntity(AtlasEntity database, final Table hiveTable, AtlasEntityWithExtInfo table) throws AtlasHookException { + if (table == null) { + table = new AtlasEntityWithExtInfo(new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName())); + } - private Referenceable createOrUpdateTableInstance(Referenceable dbReference, Referenceable tableReference, - final Table hiveTable) throws AtlasHookException { - LOG.info("Importing objects from {}.{}", hiveTable.getDbName(), hiveTable.getTableName()); + AtlasEntity tableEntity = table.getEntity(); + String tableQualifiedName = getTableQualifiedName(clusterName, hiveTable); + long createTime = BaseHiveEvent.getTableCreateTime(hiveTable); + long lastAccessTime = hiveTable.getLastAccessTime() > 0 ? hiveTable.getLastAccessTime() : createTime; - if (tableReference == null) { - tableReference = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); + tableEntity.setAttribute(ATTRIBUTE_DB, BaseHiveEvent.getObjectId(database)); + tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName); + tableEntity.setAttribute(ATTRIBUTE_NAME, hiveTable.getTableName().toLowerCase()); + tableEntity.setAttribute(ATTRIBUTE_OWNER, hiveTable.getOwner()); + + tableEntity.setAttribute(ATTRIBUTE_CREATE_TIME, createTime); + tableEntity.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime); + tableEntity.setAttribute(ATTRIBUTE_RETENTION, hiveTable.getRetention()); + tableEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveTable.getParameters()); + tableEntity.setAttribute(ATTRIBUTE_COMMENT, hiveTable.getParameters().get(ATTRIBUTE_COMMENT)); + tableEntity.setAttribute(ATTRIBUTE_TABLE_TYPE, hiveTable.getTableType().name()); + tableEntity.setAttribute(ATTRIBUTE_TEMPORARY, hiveTable.isTemporary()); + + if (hiveTable.getViewOriginalText() != null) { + tableEntity.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, hiveTable.getViewOriginalText()); } - String tableQualifiedName = getTableQualifiedName(clusterName, hiveTable); - tableReference.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName); - tableReference.set(AtlasClient.NAME, hiveTable.getTableName().toLowerCase()); - tableReference.set(AtlasClient.OWNER, hiveTable.getOwner()); - - Date createDate = new Date(); - if (hiveTable.getTTable() != null){ - try { - createDate = getTableCreatedTime(hiveTable); - LOG.debug("Setting create time to {} ", createDate); - tableReference.set(CREATE_TIME, createDate); - } catch(Exception ne) { - LOG.error("Error while setting createTime for the table {} ", hiveTable.getCompleteName(), ne); - } + if (hiveTable.getViewExpandedText() != null) { + tableEntity.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, hiveTable.getViewExpandedText()); } - Date lastAccessTime = createDate; - if ( hiveTable.getLastAccessTime() > 0) { - lastAccessTime = new Date(hiveTable.getLastAccessTime() * MILLIS_CONVERT_FACTOR); + AtlasEntity sdEntity = toStroageDescEntity(hiveTable.getSd(), tableQualifiedName, getStorageDescQFName(tableQualifiedName), BaseHiveEvent.getObjectId(tableEntity)); + List<AtlasEntity> partKeys = toColumns(hiveTable.getPartitionKeys(), tableEntity); + List<AtlasEntity> columns = toColumns(hiveTable.getCols(), tableEntity); + + tableEntity.setAttribute(ATTRIBUTE_STORAGEDESC, BaseHiveEvent.getObjectId(sdEntity)); + tableEntity.setAttribute(ATTRIBUTE_PARTITION_KEYS, BaseHiveEvent.getObjectIds(partKeys)); + tableEntity.setAttribute(ATTRIBUTE_COLUMNS, BaseHiveEvent.getObjectIds(columns)); + + if (MapUtils.isNotEmpty(table.getReferredEntities())) { + table.getReferredEntities().clear(); + } + + table.addReferredEntity(database); + table.addReferredEntity(sdEntity); + + if (partKeys != null) { + for (AtlasEntity partKey : partKeys) { + table.addReferredEntity(partKey); + } } - tableReference.set(LAST_ACCESS_TIME, lastAccessTime); - tableReference.set("retention", hiveTable.getRetention()); - tableReference.set(COMMENT, hiveTable.getParameters().get(COMMENT)); + if (columns != null) { + for (AtlasEntity column : columns) { + table.addReferredEntity(column); + } + } - // add reference to the database - tableReference.set(DB, dbReference); + return table; + } - // add reference to the StorageDescriptor - Referenceable sdReferenceable = fillStorageDesc(hiveTable.getSd(), tableQualifiedName, getStorageDescQFName(tableQualifiedName), tableReference.getId()); - tableReference.set(STORAGE_DESC, sdReferenceable); + private AtlasEntity toStroageDescEntity(StorageDescriptor storageDesc, String tableQualifiedName, String sdQualifiedName, AtlasObjectId tableId ) throws AtlasHookException { + AtlasEntity ret = new AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName()); - tableReference.set(PARAMETERS, hiveTable.getParameters()); + ret.setAttribute(ATTRIBUTE_TABLE, tableId); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName); + ret.setAttribute(ATTRIBUTE_PARAMETERS, storageDesc.getParameters()); + ret.setAttribute(ATTRIBUTE_LOCATION, storageDesc.getLocation()); + ret.setAttribute(ATTRIBUTE_INPUT_FORMAT, storageDesc.getInputFormat()); + ret.setAttribute(ATTRIBUTE_OUTPUT_FORMAT, storageDesc.getOutputFormat()); + ret.setAttribute(ATTRIBUTE_COMPRESSED, storageDesc.isCompressed()); + ret.setAttribute(ATTRIBUTE_NUM_BUCKETS, storageDesc.getNumBuckets()); + ret.setAttribute(ATTRIBUTE_STORED_AS_SUB_DIRECTORIES, storageDesc.isStoredAsSubDirectories()); - if (hiveTable.getViewOriginalText() != null) { - tableReference.set("viewOriginalText", hiveTable.getViewOriginalText()); + if (storageDesc.getBucketCols().size() > 0) { + ret.setAttribute(ATTRIBUTE_BUCKET_COLS, storageDesc.getBucketCols()); } - if (hiveTable.getViewExpandedText() != null) { - tableReference.set("viewExpandedText", hiveTable.getViewExpandedText()); + if (storageDesc.getSerdeInfo() != null) { + SerDeInfo serdeInfo = storageDesc.getSerdeInfo(); + + LOG.debug("serdeInfo = {}", serdeInfo); + // SkewedInfo skewedInfo = storageDesc.getSkewedInfo(); + + AtlasStruct serdeInfoStruct = new AtlasStruct(HiveDataTypes.HIVE_SERDE.getName()); + + serdeInfoStruct.setAttribute(ATTRIBUTE_NAME, serdeInfo.getName()); + serdeInfoStruct.setAttribute(ATTRIBUTE_SERIALIZATION_LIB, serdeInfo.getSerializationLib()); + serdeInfoStruct.setAttribute(ATTRIBUTE_PARAMETERS, serdeInfo.getParameters()); + + ret.setAttribute(ATTRIBUTE_SERDE_INFO, serdeInfoStruct); } - tableReference.set(TABLE_TYPE_ATTR, hiveTable.getTableType().name()); - tableReference.set("temporary", hiveTable.isTemporary()); + if (CollectionUtils.isNotEmpty(storageDesc.getSortCols())) { + List<AtlasStruct> sortColsStruct = new ArrayList<>(); - // add reference to the Partition Keys - List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys(), tableReference); - tableReference.set("partitionKeys", partKeys); + for (Order sortcol : storageDesc.getSortCols()) { + String hiveOrderName = HiveDataTypes.HIVE_ORDER.getName(); + AtlasStruct colStruct = new AtlasStruct(hiveOrderName); + colStruct.setAttribute("col", sortcol.getCol()); + colStruct.setAttribute("order", sortcol.getOrder()); - tableReference.set(COLUMNS, getColumns(hiveTable.getCols(), tableReference)); + sortColsStruct.add(colStruct); + } - return tableReference; + ret.setAttribute(ATTRIBUTE_SORT_COLS, sortColsStruct); + } + + return ret; } - public static String getStorageDescQFName(String entityQualifiedName) { - return entityQualifiedName + "_storage"; + private List<AtlasEntity> toColumns(List<FieldSchema> schemaList, AtlasEntity table) throws AtlasHookException { + List<AtlasEntity> ret = new ArrayList<>(); + + int columnPosition = 0; + for (FieldSchema fs : schemaList) { + LOG.debug("Processing field {}", fs); + + AtlasEntity column = new AtlasEntity(HiveDataTypes.HIVE_COLUMN.getName()); + + column.setAttribute(ATTRIBUTE_TABLE, BaseHiveEvent.getObjectId(table)); + column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getColumnQualifiedName((String) table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), fs.getName())); + column.setAttribute(ATTRIBUTE_NAME, fs.getName()); + column.setAttribute(ATTRIBUTE_OWNER, table.getAttribute(ATTRIBUTE_OWNER)); + column.setAttribute(ATTRIBUTE_COL_TYPE, fs.getType()); + column.setAttribute(ATTRIBUTE_COL_POSITION, columnPosition++); + column.setAttribute(ATTRIBUTE_COMMENT, fs.getComment()); + + ret.add(column); + } + return ret; } - private Referenceable registerTable(Referenceable dbReference, Table table) throws AtlasHookException { - try { - String dbName = table.getDbName(); - String tableName = table.getTableName(); - LOG.info("Attempting to register table [{}]", tableName); - Referenceable tableReference = getTableReference(table); - LOG.info("Found result {}", tableReference); - if (tableReference == null) { - tableReference = createTableInstance(dbReference, table); - tableReference = registerInstance(tableReference); - } else { - LOG.info("Table {}.{} is already registered with id {}. Updating entity.", dbName, tableName, - tableReference.getId().id); - tableReference = createOrUpdateTableInstance(dbReference, tableReference, table); - updateInstance(tableReference); - } - return tableReference; - } catch (Exception e) { - throw new AtlasHookException("HiveMetaStoreBridge.getStorageDescQFName() failed.", e); + private AtlasEntity toHdfsPathEntity(String pathUri) { + AtlasEntity ret = new AtlasEntity(HDFS_PATH); + Path path = new Path(pathUri); + + ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); + ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName); + ret.setAttribute(ATTRIBUTE_PATH, pathUri); + + // Only append clusterName for the HDFS path + if (pathUri.startsWith("hdfs://")) { + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHdfsPathQualifiedName(pathUri)); + } else { + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathUri); } + + return ret; } - private void updateInstance(Referenceable referenceable) throws AtlasServiceException { - String typeName = referenceable.getTypeName(); - LOG.debug("updating instance of type {}", typeName); + /** + * Gets the atlas entity for the database + * @param databaseName database Name + * @param clusterName cluster name + * @return AtlasEntity for database if exists, else null + * @throws Exception + */ + private AtlasEntityWithExtInfo findDatabase(String clusterName, String databaseName) throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Searching Atlas for database {}", databaseName); + } - String entityJSON = InstanceSerialization.toJson(referenceable, true); - LOG.debug("Updating entity {} = {}", referenceable.getTypeName(), entityJSON); + String typeName = HiveDataTypes.HIVE_DB.getName(); - atlasClient.updateEntity(referenceable.getId().id, referenceable); + return findEntity(typeName, getDBQualifiedName(clusterName, databaseName)); } - public Referenceable fillStorageDesc(StorageDescriptor storageDesc, String tableQualifiedName, - String sdQualifiedName, Id tableId) throws AtlasHookException { - LOG.debug("Filling storage descriptor information for {}", storageDesc); + /** + * Gets Atlas Entity for the table + * + * @param hiveTable + * @return table entity from Atlas if exists, else null + * @throws Exception + */ + private AtlasEntityWithExtInfo findTableEntity(Table hiveTable) throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Searching Atlas for table {}.{}", hiveTable.getDbName(), hiveTable.getTableName()); + } - Referenceable sdReferenceable = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName()); - sdReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sdQualifiedName); + String typeName = HiveDataTypes.HIVE_TABLE.getName(); + String tblQualifiedName = getTableQualifiedName(getClusterName(), hiveTable.getDbName(), hiveTable.getTableName()); - SerDeInfo serdeInfo = storageDesc.getSerdeInfo(); - LOG.debug("serdeInfo = {}", serdeInfo); - // SkewedInfo skewedInfo = storageDesc.getSkewedInfo(); + return findEntity(typeName, tblQualifiedName); + } + + private AtlasEntityWithExtInfo findProcessEntity(String qualifiedName) throws Exception{ + if (LOG.isDebugEnabled()) { + LOG.debug("Searching Atlas for process {}", qualifiedName); + } - String serdeInfoName = HiveDataTypes.HIVE_SERDE.getName(); - Struct serdeInfoStruct = new Struct(serdeInfoName); + String typeName = HiveDataTypes.HIVE_PROCESS.getName(); - serdeInfoStruct.set(AtlasClient.NAME, serdeInfo.getName()); - serdeInfoStruct.set("serializationLib", serdeInfo.getSerializationLib()); - serdeInfoStruct.set(PARAMETERS, serdeInfo.getParameters()); + return findEntity(typeName, qualifiedName); + } - sdReferenceable.set("serdeInfo", serdeInfoStruct); - sdReferenceable.set(STORAGE_NUM_BUCKETS, storageDesc.getNumBuckets()); - sdReferenceable - .set(STORAGE_IS_STORED_AS_SUB_DIRS, storageDesc.isStoredAsSubDirectories()); + private AtlasEntityWithExtInfo findEntity(final String typeName, final String qualifiedName) throws AtlasServiceException { + AtlasClientV2 atlasClientV2 = getAtlasClient(); - List<Struct> sortColsStruct = new ArrayList<>(); - for (Order sortcol : storageDesc.getSortCols()) { - String hiveOrderName = HiveDataTypes.HIVE_ORDER.getName(); - Struct colStruct = new Struct(hiveOrderName); - colStruct.set("col", sortcol.getCol()); - colStruct.set("order", sortcol.getOrder()); + try { + return atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName)); + } catch (AtlasServiceException e) { + if(e.getStatus() == ClientResponse.Status.NOT_FOUND) { + return null; + } - sortColsStruct.add(colStruct); + throw e; } - if (sortColsStruct.size() > 0) { - sdReferenceable.set("sortCols", sortColsStruct); + } + + private String getCreateTableString(Table table, String location){ + String colString = ""; + List<FieldSchema> colList = table.getAllCols(); + + if (colList != null) { + for (FieldSchema col : colList) { + colString += col.getName() + " " + col.getType() + ","; + } + + if (colList.size() > 0) { + colString = colString.substring(0, colString.length() - 1); + colString = "(" + colString + ")"; + } } - sdReferenceable.set(LOCATION, storageDesc.getLocation()); - sdReferenceable.set("inputFormat", storageDesc.getInputFormat()); - sdReferenceable.set("outputFormat", storageDesc.getOutputFormat()); - sdReferenceable.set("compressed", storageDesc.isCompressed()); + String query = "create external table " + table.getTableName() + colString + " location '" + location + "'"; - if (storageDesc.getBucketCols().size() > 0) { - sdReferenceable.set("bucketCols", storageDesc.getBucketCols()); + return query; + } + + private String lower(String str) { + if (StringUtils.isEmpty(str)) { + return null; } - sdReferenceable.set(PARAMETERS, storageDesc.getParameters()); - sdReferenceable.set("storedAsSubDirectories", storageDesc.isStoredAsSubDirectories()); - sdReferenceable.set(TABLE, tableId); + return str.toLowerCase().trim(); + } + - return sdReferenceable; + /** + * Construct the qualified name used to uniquely identify a Table instance in Atlas. + * @param clusterName Name of the cluster to which the Hive component belongs + * @param table hive table for which the qualified name is needed + * @return Unique qualified name to identify the Table instance in Atlas. + */ + private static String getTableQualifiedName(String clusterName, Table table) { + return getTableQualifiedName(clusterName, table.getDbName(), table.getTableName(), table.isTemporary()); } - public Referenceable fillHDFSDataSet(String pathUri) { - Referenceable ref = new Referenceable(HDFS_PATH); - ref.set("path", pathUri); - Path path = new Path(pathUri); - ref.set(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); - ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri); - ref.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); - return ref; + private String getHdfsPathQualifiedName(String hdfsPath) { + return String.format("%s@%s", hdfsPath, clusterName); } - public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) { - final String[] parts = tableQualifiedName.split("@"); - final String tableName = parts[0]; - final String clusterName = parts[1]; - return String.format("%s.%s@%s", tableName, colName.toLowerCase(), clusterName); + /** + * Construct the qualified name used to uniquely identify a Database instance in Atlas. + * @param clusterName Name of the cluster to which the Hive component belongs + * @param dbName Name of the Hive database + * @return Unique qualified name to identify the Database instance in Atlas. + */ + public static String getDBQualifiedName(String clusterName, String dbName) { + return String.format("%s@%s", dbName.toLowerCase(), clusterName); } - public List<Referenceable> getColumns(List<FieldSchema> schemaList, Referenceable tableReference) throws AtlasHookException { - List<Referenceable> colList = new ArrayList<>(); - int columnPosition = 0; - for (FieldSchema fs : schemaList) { - LOG.debug("Processing field {}", fs); - Referenceable colReferenceable = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName()); - colReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - getColumnQualifiedName((String) tableReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), fs.getName())); - colReferenceable.set(AtlasClient.NAME, fs.getName()); - colReferenceable.set(AtlasClient.OWNER, tableReference.get(AtlasClient.OWNER)); - colReferenceable.set("type", fs.getType()); - colReferenceable.set(POSITION, columnPosition++); - colReferenceable.set(COMMENT, fs.getComment()); - colReferenceable.set(TABLE, tableReference.getId()); - - - colList.add(colReferenceable); + /** + * Construct the qualified name used to uniquely identify a Table instance in Atlas. + * @param clusterName Name of the cluster to which the Hive component belongs + * @param dbName Name of the Hive database to which the Table belongs + * @param tableName Name of the Hive table + * @param isTemporaryTable is this a temporary table + * @return Unique qualified name to identify the Table instance in Atlas. + */ + public static String getTableQualifiedName(String clusterName, String dbName, String tableName, boolean isTemporaryTable) { + String tableTempName = tableName; + + if (isTemporaryTable) { + if (SessionState.get() != null && SessionState.get().getSessionId() != null) { + tableTempName = tableName + TEMP_TABLE_PREFIX + SessionState.get().getSessionId(); + } else { + tableTempName = tableName + TEMP_TABLE_PREFIX + RandomStringUtils.random(10); + } } - return colList; + + return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), clusterName); } + public static String getTableProcessQualifiedName(String clusterName, Table table) { + String tableQualifiedName = getTableQualifiedName(clusterName, table); + long createdTime = getTableCreatedTime(table); - public static void main(String[] args) throws AtlasHookException { - try { - Configuration atlasConf = ApplicationProperties.get(); - String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT); - if (atlasEndpoint == null || atlasEndpoint.length == 0){ - atlasEndpoint = new String[] { DEFAULT_DGI_URL }; - } - AtlasClient atlasClient; + return tableQualifiedName + SEP + createdTime; + } - if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { - String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput(); - atlasClient = new AtlasClient(atlasEndpoint, basicAuthUsernamePassword); - } else { - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - atlasClient = new AtlasClient(ugi, ugi.getShortUserName(), atlasEndpoint); - } - Options options = new Options(); - CommandLineParser parser = new BasicParser(); - CommandLine cmd = parser.parse( options, args); + /** + * Construct the qualified name used to uniquely identify a Table instance in Atlas. + * @param clusterName Name of the cluster to which the Hive component belongs + * @param dbName Name of the Hive database to which the Table belongs + * @param tableName Name of the Hive table + * @return Unique qualified name to identify the Table instance in Atlas. + */ + public static String getTableQualifiedName(String clusterName, String dbName, String tableName) { + return getTableQualifiedName(clusterName, dbName, tableName, false); + } - boolean failOnError = false; - if (cmd.hasOption("failOnError")) { - failOnError = true; - } + public static String getStorageDescQFName(String tableQualifiedName) { + return tableQualifiedName + "_storage"; + } - HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClient); - hiveMetaStoreBridge.importHiveMetadata(failOnError); - } - catch(Exception e) { - throw new AtlasHookException("HiveMetaStoreBridge.main() failed.", e); - } + + public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) { + final String[] parts = tableQualifiedName.split("@"); + final String tableName = parts[0]; + final String clusterName = parts[1]; + + return String.format("%s.%s@%s", tableName, colName.toLowerCase(), clusterName); + } + + public static long getTableCreatedTime(Table table) { + return table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR; } } http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java new file mode 100644 index 0000000..9105ebe --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hive.hook; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.plan.HiveOperation; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + + +public class AtlasHiveHookContext { + private final HiveHook hook; + private final HiveOperation hiveOperation; + private final HookContext hiveContext; + private final Hive hive; + private final Map<String, AtlasEntity> qNameEntityMap = new HashMap<>(); + + public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext) throws Exception { + this.hook = hook; + this.hiveOperation = hiveOperation; + this.hiveContext = hiveContext; + this.hive = Hive.get(hiveContext.getConf()); + } + + public HookContext getHiveContext() { + return hiveContext; + } + + public Hive getHive() { + return hive; + } + + public HiveOperation getHiveOperation() { + return hiveOperation; + } + + public void putEntity(String qualifiedName, AtlasEntity entity) { + qNameEntityMap.put(qualifiedName, entity); + } + + public AtlasEntity getEntity(String qualifiedName) { + return qNameEntityMap.get(qualifiedName); + } + + public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); } + + + public String getClusterName() { + return hook.getClusterName(); + } + + public boolean isKnownDatabase(String dbQualifiedName) { + return hook.isKnownDatabase(dbQualifiedName); + } + + public boolean isKnownTable(String tblQualifiedName) { + return hook.isKnownTable(tblQualifiedName); + } + + public void addToKnownEntities(Collection<AtlasEntity> entities) { + hook.addToKnownEntities(entities); + } + + public void removeFromKnownDatabase(String dbQualifiedName) { + hook.removeFromKnownDatabase(dbQualifiedName); + } + + public void removeFromKnownTable(String tblQualifiedName) { + hook.removeFromKnownTable(tblQualifiedName); + } +}
