http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java new file mode 100644 index 0000000..47de783 --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java @@ -0,0 +1,822 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hive.hook.events; + +import org.apache.atlas.hive.hook.AtlasHiveHookContext; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.hooks.Entity; +import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.DependencyKey; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +public abstract class BaseHiveEvent { + private static final Logger LOG = LoggerFactory.getLogger(BaseHiveEvent.class); + + public static final String HIVE_TYPE_DB = "hive_db"; + public static final String HIVE_TYPE_TABLE = "hive_table"; + public static final String HIVE_TYPE_STORAGEDESC = "hive_storagedesc"; + public static final String HIVE_TYPE_COLUMN = "hive_column"; + public static final String HIVE_TYPE_PROCESS = "hive_process"; + public static final String HIVE_TYPE_COLUMN_LINEAGE = "hive_column_lineage"; + public static final String HIVE_TYPE_SERDE = "hive_serde"; + public static final String HIVE_TYPE_ORDER = "hive_order"; + public static final String HDFS_TYPE_PATH = "hdfs_path"; + + public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + public static final String ATTRIBUTE_NAME = "name"; + public static final String ATTRIBUTE_DESCRIPTION = "description"; + public static final String ATTRIBUTE_OWNER = "owner"; + public static final String ATTRIBUTE_CLUSTER_NAME = "clusterName"; + public static final String ATTRIBUTE_LOCATION = "location"; + public static final String ATTRIBUTE_PARAMETERS = "parameters"; + public static final String ATTRIBUTE_OWNER_TYPE = "ownerType"; + public static final String ATTRIBUTE_COMMENT = "comment"; + public static final String ATTRIBUTE_CREATE_TIME = "createTime"; + public static final String ATTRIBUTE_LAST_ACCESS_TIME = "lastAccessTime"; + public static final String ATTRIBUTE_VIEW_ORIGINAL_TEXT = "viewOriginalText"; + public static final String ATTRIBUTE_VIEW_EXPANDED_TEXT = "viewExpandedText"; + public static final String ATTRIBUTE_TABLE_TYPE = "tableType"; + public static final String ATTRIBUTE_TEMPORARY = "temporary"; + public static final String ATTRIBUTE_RETENTION = "retention"; + public static final String ATTRIBUTE_DB = "db"; + public static final String ATTRIBUTE_STORAGEDESC = "sd"; + public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys"; + public static final String ATTRIBUTE_COLUMNS = "columns"; + public static final String ATTRIBUTE_INPUT_FORMAT = "inputFormat"; + public static final String ATTRIBUTE_OUTPUT_FORMAT = "outputFormat"; + public static final String ATTRIBUTE_COMPRESSED = "compressed"; + public static final String ATTRIBUTE_BUCKET_COLS = "bucketCols"; + public static final String ATTRIBUTE_NUM_BUCKETS = "numBuckets"; + public static final String ATTRIBUTE_STORED_AS_SUB_DIRECTORIES = "storedAsSubDirectories"; + public static final String ATTRIBUTE_TABLE = "table"; + public static final String ATTRIBUTE_SERDE_INFO = "serdeInfo"; + public static final String ATTRIBUTE_SERIALIZATION_LIB = "serializationLib"; + public static final String ATTRIBUTE_SORT_COLS = "sortCols"; + public static final String ATTRIBUTE_COL_TYPE = "type"; + public static final String ATTRIBUTE_COL_POSITION = "position"; + public static final String ATTRIBUTE_PATH = "path"; + public static final String ATTRIBUTE_NAMESERVICE_ID = "nameServiceId"; + public static final String ATTRIBUTE_INPUTS = "inputs"; + public static final String ATTRIBUTE_OUTPUTS = "outputs"; + public static final String ATTRIBUTE_OPERATION_TYPE = "operationType"; + public static final String ATTRIBUTE_START_TIME = "startTime"; + public static final String ATTRIBUTE_USER_NAME = "userName"; + public static final String ATTRIBUTE_QUERY_TEXT = "queryText"; + public static final String ATTRIBUTE_QUERY_ID = "queryId"; + public static final String ATTRIBUTE_QUERY_PLAN = "queryPlan"; + public static final String ATTRIBUTE_END_TIME = "endTime"; + public static final String ATTRIBUTE_RECENT_QUERIES = "recentQueries"; + public static final String ATTRIBUTE_QUERY = "query"; + public static final String ATTRIBUTE_DEPENDENCY_TYPE = "depenendencyType"; + public static final String ATTRIBUTE_EXPRESSION = "expression"; + public static final String ATTRIBUTE_ALIASES = "aliases"; + + + public static final char QNAME_SEP_CLUSTER_NAME = '@'; + public static final char QNAME_SEP_ENTITY_NAME = '.'; + public static final char QNAME_SEP_PROCESS = ':'; + public static final String TEMP_TABLE_PREFIX = "_temp-"; + public static final long MILLIS_CONVERT_FACTOR = 1000; + + public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>(); + + + static { + OWNER_TYPE_TO_ENUM_VALUE.put(1, "USER"); + OWNER_TYPE_TO_ENUM_VALUE.put(2, "ROLE"); + OWNER_TYPE_TO_ENUM_VALUE.put(3, "GROUP"); + } + + protected final AtlasHiveHookContext context; + + + protected BaseHiveEvent(AtlasHiveHookContext context) { + this.context = context; + } + + public AtlasHiveHookContext getContext() { + return context; + } + + public List<HookNotificationMessage> getNotificationMessages() throws Exception { + return null; + } + + public static long getTableCreateTime(Table table) { + return table.getTTable() != null ? (table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR) : System.currentTimeMillis(); + } + + public static AtlasObjectId getObjectId(AtlasEntity entity) { + String qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); + AtlasObjectId ret = new AtlasObjectId(entity.getGuid(), entity.getTypeName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName)); + + return ret; + } + + public static List<AtlasObjectId> getObjectIds(List<AtlasEntity> entities) { + final List<AtlasObjectId> ret; + + if (CollectionUtils.isNotEmpty(entities)) { + ret = new ArrayList<>(entities.size()); + + for (AtlasEntity entity : entities) { + ret.add(getObjectId(entity)); + } + } else { + ret = Collections.emptyList(); + } + + return ret; + } + + + protected void addProcessedEntities(AtlasEntitiesWithExtInfo entitiesWithExtInfo) { + for (AtlasEntity entity : context.getEntities()) { + entitiesWithExtInfo.addReferredEntity(entity); + } + + entitiesWithExtInfo.compact(); + + context.addToKnownEntities(entitiesWithExtInfo.getEntities()); + + if (entitiesWithExtInfo.getReferredEntities() != null) { + context.addToKnownEntities(entitiesWithExtInfo.getReferredEntities().values()); + } + } + + protected AtlasEntity getInputOutputEntity(Entity entity, AtlasEntityExtInfo entityExtInfo) throws Exception { + AtlasEntity ret = null; + + switch(entity.getType()) { + case TABLE: + case PARTITION: + case DFS_DIR: { + ret = toAtlasEntity(entity, entityExtInfo); + } + break; + } + + return ret; + } + + protected AtlasEntity toAtlasEntity(Entity entity, AtlasEntityExtInfo entityExtInfo) throws Exception { + AtlasEntity ret = null; + + switch (entity.getType()) { + case DATABASE: { + Database db = getHive().getDatabase(entity.getDatabase().getName()); + + ret = toDbEntity(db); + } + break; + + case TABLE: + case PARTITION: { + Table table = getHive().getTable(entity.getTable().getDbName(), entity.getTable().getTableName()); + + ret = toTableEntity(table, entityExtInfo); + } + break; + + case DFS_DIR: { + URI location = entity.getLocation(); + + if (location != null) { + ret = getHDFSPathEntity(new Path(entity.getLocation())); + } + } + break; + + default: + break; + } + + return ret; + } + + protected AtlasEntity toDbEntity(Database db) throws Exception { + String dbQualifiedName = getQualifiedName(db); + boolean isKnownDatabase = context.isKnownDatabase(dbQualifiedName); + + AtlasEntity ret = context.getEntity(dbQualifiedName); + + if (ret == null) { + ret = new AtlasEntity(HIVE_TYPE_DB); + + // if this DB was sent in an earlier notification, set 'guid' to null - which will: + // - result in this entity to be not included in 'referredEntities' + // - cause Atlas server to resolve the entity by its qualifiedName + if (isKnownDatabase) { + ret.setGuid(null); + } + + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName); + ret.setAttribute(ATTRIBUTE_NAME, db.getName().toLowerCase()); + ret.setAttribute(ATTRIBUTE_DESCRIPTION, db.getDescription()); + ret.setAttribute(ATTRIBUTE_OWNER, db.getOwnerName()); + + ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName()); + ret.setAttribute(ATTRIBUTE_LOCATION, db.getLocationUri()); + ret.setAttribute(ATTRIBUTE_PARAMETERS, db.getParameters()); + + if (db.getOwnerType() != null) { + ret.setAttribute(ATTRIBUTE_OWNER_TYPE, OWNER_TYPE_TO_ENUM_VALUE.get(db.getOwnerType().getValue())); + } + + context.putEntity(dbQualifiedName, ret); + } + + return ret; + } + + protected AtlasEntityWithExtInfo toTableEntity(Table table) throws Exception { + AtlasEntityWithExtInfo ret = new AtlasEntityWithExtInfo(); + + AtlasEntity entity = toTableEntity(table, ret); + + ret.setEntity(entity); + + return ret; + } + + protected AtlasEntity toTableEntity(Table table, AtlasEntitiesWithExtInfo entities) throws Exception { + AtlasEntity ret = toTableEntity(table, (AtlasEntityExtInfo) entities); + + entities.addEntity(ret); + + return ret; + } + + protected AtlasEntity toTableEntity(Table table, AtlasEntityExtInfo entityExtInfo) throws Exception { + AtlasEntity dbEntity = toDbEntity(getHive().getDatabase(table.getDbName())); + + if (entityExtInfo != null) { + if (dbEntity != null) { + entityExtInfo.addReferredEntity(dbEntity); + } + } + + AtlasEntity ret = toTableEntity(getObjectId(dbEntity), table, entityExtInfo); + + return ret; + } + + protected AtlasEntity toTableEntity(AtlasObjectId dbId, Table table, AtlasEntityExtInfo entityExtInfo) throws Exception { + String tblQualifiedName = getQualifiedName(table); + boolean isKnownTable = context.isKnownTable(tblQualifiedName); + + AtlasEntity ret = context.getEntity(tblQualifiedName); + + if (ret == null) { + ret = new AtlasEntity(HIVE_TYPE_TABLE); + + // if this table was sent in an earlier notification, set 'guid' to null - which will: + // - result in this entity to be not included in 'referredEntities' + // - cause Atlas server to resolve the entity by its qualifiedName + if (isKnownTable && !isAlterTableOperation()) { + ret.setGuid(null); + } + + long createTime = getTableCreateTime(table); + long lastAccessTime = table.getLastAccessTime() > 0 ? (table.getLastAccessTime() * MILLIS_CONVERT_FACTOR) : createTime; + + ret.setAttribute(ATTRIBUTE_DB, dbId); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tblQualifiedName); + ret.setAttribute(ATTRIBUTE_NAME, table.getTableName().toLowerCase()); + ret.setAttribute(ATTRIBUTE_OWNER, table.getOwner()); + ret.setAttribute(ATTRIBUTE_CREATE_TIME, createTime); + ret.setAttribute(ATTRIBUTE_LAST_ACCESS_TIME, lastAccessTime); + ret.setAttribute(ATTRIBUTE_RETENTION, table.getRetention()); + ret.setAttribute(ATTRIBUTE_PARAMETERS, table.getParameters()); + ret.setAttribute(ATTRIBUTE_COMMENT, table.getParameters().get(ATTRIBUTE_COMMENT)); + ret.setAttribute(ATTRIBUTE_TABLE_TYPE, table.getTableType().name()); + ret.setAttribute(ATTRIBUTE_TEMPORARY, table.isTemporary()); + + if (table.getViewOriginalText() != null) { + ret.setAttribute(ATTRIBUTE_VIEW_ORIGINAL_TEXT, table.getViewOriginalText()); + } + + if (table.getViewExpandedText() != null) { + ret.setAttribute(ATTRIBUTE_VIEW_EXPANDED_TEXT, table.getViewExpandedText()); + } + + AtlasObjectId tableId = getObjectId(ret); + AtlasEntity sd = getStorageDescEntity(tableId, table); + List<AtlasEntity> partitionKeys = getColumnEntities(tableId, table, table.getPartitionKeys()); + List<AtlasEntity> columns = getColumnEntities(tableId, table, table.getCols()); + + if (entityExtInfo != null) { + entityExtInfo.addReferredEntity(sd); + + if (partitionKeys != null) { + for (AtlasEntity partitionKey : partitionKeys) { + entityExtInfo.addReferredEntity(partitionKey); + } + } + + if (columns != null) { + for (AtlasEntity column : columns) { + entityExtInfo.addReferredEntity(column); + } + } + } + + ret.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sd)); + ret.setAttribute(ATTRIBUTE_PARTITION_KEYS, getObjectIds(partitionKeys)); + ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns)); + + context.putEntity(tblQualifiedName, ret); + } + + return ret; + } + + protected AtlasEntity getStorageDescEntity(AtlasObjectId tableId, Table table) { + String sdQualifiedName = getQualifiedName(table, table.getSd()); + boolean isKnownTable = tableId.getGuid() == null; + + AtlasEntity ret = context.getEntity(sdQualifiedName); + + if (ret == null) { + ret = new AtlasEntity(HIVE_TYPE_STORAGEDESC); + + // if sd's table was sent in an earlier notification, set 'guid' to null - which will: + // - result in this entity to be not included in 'referredEntities' + // - cause Atlas server to resolve the entity by its qualifiedName + if (isKnownTable) { + ret.setGuid(null); + } + + StorageDescriptor sd = table.getSd(); + + ret.setAttribute(ATTRIBUTE_TABLE, tableId); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, sdQualifiedName); + ret.setAttribute(ATTRIBUTE_PARAMETERS, sd.getParameters()); + ret.setAttribute(ATTRIBUTE_LOCATION, sd.getLocation()); + ret.setAttribute(ATTRIBUTE_INPUT_FORMAT, sd.getInputFormat()); + ret.setAttribute(ATTRIBUTE_OUTPUT_FORMAT, sd.getOutputFormat()); + ret.setAttribute(ATTRIBUTE_COMPRESSED, sd.isCompressed()); + ret.setAttribute(ATTRIBUTE_NUM_BUCKETS, sd.getNumBuckets()); + ret.setAttribute(ATTRIBUTE_STORED_AS_SUB_DIRECTORIES, sd.isStoredAsSubDirectories()); + + if (sd.getBucketCols().size() > 0) { + ret.setAttribute(ATTRIBUTE_BUCKET_COLS, sd.getBucketCols()); + } + + if (sd.getSerdeInfo() != null) { + AtlasStruct serdeInfo = new AtlasStruct(HIVE_TYPE_SERDE); + SerDeInfo sdSerDeInfo = sd.getSerdeInfo(); + + serdeInfo.setAttribute(ATTRIBUTE_NAME, sdSerDeInfo.getName()); + serdeInfo.setAttribute(ATTRIBUTE_SERIALIZATION_LIB, sdSerDeInfo.getSerializationLib()); + serdeInfo.setAttribute(ATTRIBUTE_PARAMETERS, sdSerDeInfo.getParameters()); + + ret.setAttribute(ATTRIBUTE_SERDE_INFO, serdeInfo); + } + + if (CollectionUtils.isNotEmpty(sd.getSortCols())) { + List<AtlasStruct> sortCols = new ArrayList<>(sd.getSortCols().size()); + + for (Order sdSortCol : sd.getSortCols()) { + AtlasStruct sortcol = new AtlasStruct(HIVE_TYPE_ORDER); + + sortcol.setAttribute("col", sdSortCol.getCol()); + sortcol.setAttribute("order", sdSortCol.getOrder()); + + sortCols.add(sortcol); + } + + ret.setAttribute(ATTRIBUTE_SORT_COLS, sortCols); + } + + context.putEntity(sdQualifiedName, ret); + } + + return ret; + } + + protected List<AtlasEntity> getColumnEntities(AtlasObjectId tableId, Table table, List<FieldSchema> fieldSchemas) { + List<AtlasEntity> ret = new ArrayList<>(); + boolean isKnownTable = tableId.getGuid() == null; + + int columnPosition = 0; + for (FieldSchema fieldSchema : fieldSchemas) { + String colQualifiedName = getQualifiedName(table, fieldSchema); + AtlasEntity column = context.getEntity(colQualifiedName); + + if (column == null) { + column = new AtlasEntity(HIVE_TYPE_COLUMN); + + // if column's table was sent in an earlier notification, set 'guid' to null - which will: + // - result in this entity to be not included in 'referredEntities' + // - cause Atlas server to resolve the entity by its qualifiedName + if (isKnownTable) { + column.setGuid(null); + } + + column.setAttribute(ATTRIBUTE_TABLE, tableId); + column.setAttribute(ATTRIBUTE_QUALIFIED_NAME, colQualifiedName); + column.setAttribute(ATTRIBUTE_NAME, fieldSchema.getName()); + column.setAttribute(ATTRIBUTE_OWNER, table.getOwner()); + column.setAttribute(ATTRIBUTE_COL_TYPE, fieldSchema.getType()); + column.setAttribute(ATTRIBUTE_COL_POSITION, columnPosition++); + column.setAttribute(ATTRIBUTE_COMMENT, fieldSchema.getComment()); + + context.putEntity(colQualifiedName, column); + } + + ret.add(column); + } + + return ret; + } + + protected AtlasEntity getHDFSPathEntity(Path path) { + String strPath = path.toString().toLowerCase(); + String pathQualifiedName = getQualifiedName(strPath); + AtlasEntity ret = context.getEntity(pathQualifiedName); + + if (ret == null) { + ret = new AtlasEntity(HDFS_TYPE_PATH); + + ret.setAttribute(ATTRIBUTE_PATH, strPath); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); + ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); + ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName()); + + context.putEntity(pathQualifiedName, ret); + } + + return ret; + } + + protected AtlasEntity getHiveProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception { + AtlasEntity ret = new AtlasEntity(HIVE_TYPE_PROCESS); + HookContext hookContext = getHiveContext(); + String queryStr = hookContext.getQueryPlan().getQueryStr(); + + if (queryStr != null) { + queryStr = queryStr.toLowerCase(); + } + + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(inputs, outputs)); + ret.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputs)); + ret.setAttribute(ATTRIBUTE_OUTPUTS, getObjectIds(outputs)); + ret.setAttribute(ATTRIBUTE_NAME, queryStr); + ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, hookContext.getOperationName()); + ret.setAttribute(ATTRIBUTE_START_TIME, hookContext.getQueryPlan().getQueryStartTime()); + ret.setAttribute(ATTRIBUTE_END_TIME, System.currentTimeMillis()); + ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName()); + ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr); + ret.setAttribute(ATTRIBUTE_QUERY_ID, hookContext.getQueryPlan().getQuery().getQueryId()); + ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported"); + ret.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(queryStr)); + + return ret; + } + + protected String getClusterName() { + return context.getClusterName(); + } + + protected Hive getHive() { + return context.getHive(); + } + + protected HookContext getHiveContext() { + return context.getHiveContext(); + } + + protected String getUserName() { + String ret = getHiveContext().getUserName(); + + if (StringUtils.isEmpty(ret)) { + UserGroupInformation ugi = getHiveContext().getUgi(); + + if (ugi != null) { + ret = ugi.getShortUserName(); + } + + if (StringUtils.isEmpty(ret)) { + try { + ret = UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException e) { + LOG.warn("Failed for UserGroupInformation.getCurrentUser() ", e); + ret = System.getProperty("user.name"); + } + } + } + + + return ret; + } + + protected String getQualifiedName(Entity entity) throws Exception { + switch (entity.getType()) { + case DATABASE: + return getQualifiedName(entity.getDatabase()); + + case TABLE: + case PARTITION: + return getQualifiedName(entity.getTable()); + + case DFS_DIR: + return getQualifiedName(entity.getLocation()); + } + + return null; + } + + protected String getQualifiedName(Database db) { + return (db.getName() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); + } + + protected String getQualifiedName(Table table) { + String tableName = table.getTableName(); + + if (table.isTemporary()) { + if (SessionState.get() != null && SessionState.get().getSessionId() != null) { + tableName = tableName + TEMP_TABLE_PREFIX + SessionState.get().getSessionId(); + } else { + tableName = tableName + TEMP_TABLE_PREFIX + RandomStringUtils.random(10); + } + } + + return (table.getDbName() + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); + } + + protected String getQualifiedName(Table table, StorageDescriptor sd) { + return getQualifiedName(table) + "_storage"; + } + + protected String getQualifiedName(Table table, FieldSchema column) { + String tblQualifiedName = getQualifiedName(table); + + int sepPos = tblQualifiedName.lastIndexOf(QNAME_SEP_CLUSTER_NAME); + + if (sepPos == -1) { + return tblQualifiedName + QNAME_SEP_ENTITY_NAME + column.getName().toLowerCase(); + } else { + return tblQualifiedName.substring(0, sepPos) + QNAME_SEP_ENTITY_NAME + column.getName().toLowerCase() + tblQualifiedName.substring(sepPos); + } + } + + protected String getQualifiedName(DependencyKey column) { + String dbName = column.getDataContainer().getTable().getDbName(); + String tableName = column.getDataContainer().getTable().getTableName(); + String colName = column.getFieldSchema().getName(); + + return getQualifiedName(dbName, tableName, colName); + } + + protected String getQualifiedName(BaseColumnInfo column) { + String dbName = column.getTabAlias().getTable().getDbName(); + String tableName = column.getTabAlias().getTable().getTableName(); + String colName = column.getColumn().getName(); + + return getQualifiedName(dbName, tableName, colName); + } + + protected String getQualifiedName(String dbName, String tableName, String colName) { + return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); + } + + protected String getQualifiedName(URI location) { + String strPath = new Path(location).toString().toLowerCase(); + + return getQualifiedName(strPath); + } + + protected String getQualifiedName(String path) { + if (path.startsWith("hdfs://")) { + return (path + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); + } + + return path.toLowerCase(); + } + + protected String getQualifiedName(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception { + HiveOperation operation = context.getHiveOperation(); + + if (operation == HiveOperation.CREATETABLE || + operation == HiveOperation.CREATETABLE_AS_SELECT || + operation == HiveOperation.CREATEVIEW || + operation == HiveOperation.ALTERVIEW_AS || + operation == HiveOperation.ALTERTABLE_LOCATION) { + List<? extends Entity> sortedEntities = new ArrayList<>(getHiveContext().getOutputs()); + + Collections.sort(sortedEntities, entityComparator); + + for (Entity entity : sortedEntities) { + if (entity.getType() == Entity.Type.TABLE) { + Table table = entity.getTable(); + + table = getHive().getTable(table.getDbName(), table.getTableName()); + + long createTime = getTableCreateTime(table); + + return getQualifiedName(table) + QNAME_SEP_PROCESS + createTime; + } + } + } + + StringBuilder sb = new StringBuilder(getHiveContext().getOperationName()); + + boolean ignoreHDFSPaths = ignoreHDFSPathsinProcessQualifiedName(); + + addToProcessQualifiedName(sb, getHiveContext().getInputs(), ignoreHDFSPaths); + sb.append("->"); + addToProcessQualifiedName(sb, getHiveContext().getOutputs(), ignoreHDFSPaths); + + return sb.toString(); + } + + private boolean ignoreHDFSPathsinProcessQualifiedName() { + switch (context.getHiveOperation()) { + case LOAD: + case IMPORT: + return hasPartitionEntity(getHiveContext().getOutputs()); + case EXPORT: + return hasPartitionEntity(getHiveContext().getInputs()); + case QUERY: + return true; + } + + return false; + } + + private boolean hasPartitionEntity(Collection<? extends Entity> entities) { + if (entities != null) { + for (Entity entity : entities) { + if (entity.getType() == Entity.Type.PARTITION) { + return true; + } + } + } + + return false; + } + + private void addToProcessQualifiedName(StringBuilder processQualifiedName, Collection<? extends Entity> entities, boolean ignoreHDFSPaths) { + if (entities == null) { + return; + } + + List<? extends Entity> sortedEntities = new ArrayList<>(entities); + + Collections.sort(sortedEntities, entityComparator); + + Set<String> dataSetsProcessed = new HashSet<>(); + + for (Entity entity : sortedEntities) { + if (ignoreHDFSPaths && (Entity.Type.DFS_DIR.equals(entity.getType()) || Entity.Type.LOCAL_DIR.equals(entity.getType()))) { + continue; + } + + String qualifiedName = null; + long createTime = 0; + + try { + if (entity.getType() == Entity.Type.PARTITION || entity.getType() == Entity.Type.TABLE) { + Table table = getHive().getTable(entity.getTable().getDbName(), entity.getTable().getTableName()); + + if (table != null) { + createTime = getTableCreateTime(table); + qualifiedName = getQualifiedName(table); + } + } else { + qualifiedName = getQualifiedName(entity); + } + } catch (Exception excp) { + LOG.error("error while computing qualifiedName for process", excp); + } + + if (qualifiedName == null || !dataSetsProcessed.add(qualifiedName)) { + continue; + } + + if (entity instanceof WriteEntity) { // output entity + WriteEntity writeEntity = (WriteEntity) entity; + + if (writeEntity.getWriteType() != null && HiveOperation.QUERY.equals(context.getHiveOperation())) { + boolean addWriteType = false; + + switch (writeEntity.getWriteType()) { + case INSERT: + case INSERT_OVERWRITE: + case UPDATE: + case DELETE: + addWriteType = true; + break; + + case PATH_WRITE: + addWriteType = !Entity.Type.LOCAL_DIR.equals(entity.getType()); + break; + } + + if (addWriteType) { + processQualifiedName.append(QNAME_SEP_PROCESS).append(writeEntity.getWriteType().name()); + } + } + } + + processQualifiedName.append(QNAME_SEP_PROCESS).append(qualifiedName.toLowerCase().replaceAll("/", "")); + + if (createTime != 0) { + processQualifiedName.append(QNAME_SEP_PROCESS).append(createTime); + } + } + } + + public boolean isAlterTableOperation() { + switch (context.getHiveOperation()) { + case ALTERTABLE_FILEFORMAT: + case ALTERTABLE_CLUSTER_SORT: + case ALTERTABLE_BUCKETNUM: + case ALTERTABLE_PROPERTIES: + case ALTERTABLE_SERDEPROPERTIES: + case ALTERTABLE_SERIALIZER: + case ALTERTABLE_ADDCOLS: + case ALTERTABLE_REPLACECOLS: + case ALTERTABLE_PARTCOLTYPE: + case ALTERTABLE_LOCATION: + case ALTERTABLE_RENAME: + case ALTERTABLE_RENAMECOL: + case ALTERVIEW_PROPERTIES: + case ALTERVIEW_RENAME: + return true; + } + + return false; + } + + + static final class EntityComparator implements Comparator<Entity> { + @Override + public int compare(Entity entity1, Entity entity2) { + String name1 = entity1.getName(); + String name2 = entity2.getName(); + + if (name1 == null || name2 == null) { + name1 = entity1.getD().toString(); + name2 = entity2.getD().toString(); + } + + return name1.toLowerCase().compareTo(name2.toLowerCase()); + } + } + + static final Comparator<Entity> entityComparator = new EntityComparator(); +}
http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateDatabase.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateDatabase.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateDatabase.java new file mode 100644 index 0000000..d5df9e7 --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateDatabase.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hive.hook.events; + +import org.apache.atlas.hive.hook.AtlasHiveHookContext; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequestV2; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.hooks.Entity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; + +public class CreateDatabase extends BaseHiveEvent { + private static final Logger LOG = LoggerFactory.getLogger(CreateDatabase.class); + + public CreateDatabase(AtlasHiveHookContext context) { + super(context); + } + + @Override + public List<HookNotificationMessage> getNotificationMessages() throws Exception { + AtlasEntitiesWithExtInfo entities = getEntities(); + HookNotificationMessage notification = new EntityCreateRequestV2(getUserName(), entities); + List<HookNotificationMessage> ret = Collections.singletonList(notification); + + return ret; + } + + public AtlasEntitiesWithExtInfo getEntities() throws Exception { + AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo(); + + for (Entity entity : getHiveContext().getOutputs()) { + if (entity.getType() == Entity.Type.DATABASE) { + Database db = entity.getDatabase(); + + if (db != null) { + db = getHive().getDatabase(db.getName()); + } + + if (db != null) { + AtlasEntity dbEntity = toDbEntity(db); + + ret.addEntity(dbEntity); + } else { + LOG.error("CreateDatabase.getEntities(): failed to retrieve db"); + } + } + } + + addProcessedEntities(ret); + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java new file mode 100644 index 0000000..43016d4 --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hive.hook.events; + +import org.apache.atlas.hive.hook.AtlasHiveHookContext; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequestV2; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.hive.ql.hooks.Entity; +import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.DependencyKey; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +public class CreateHiveProcess extends BaseHiveEvent { + private static final Logger LOG = LoggerFactory.getLogger(CreateHiveProcess.class); + + public CreateHiveProcess(AtlasHiveHookContext context) { + super(context); + } + + @Override + public List<HookNotificationMessage> getNotificationMessages() throws Exception { + AtlasEntitiesWithExtInfo entities = getEntities(); + List<HookNotificationMessage> ret = entities != null ? Collections.singletonList(new EntityCreateRequestV2(getUserName(), entities)) : null; + + return ret; + } + + public AtlasEntitiesWithExtInfo getEntities() throws Exception { + AtlasEntitiesWithExtInfo ret = null; + + if (!skipProcess()) { + List<AtlasEntity> inputs = new ArrayList<>(); + List<AtlasEntity> outputs = new ArrayList<>(); + HookContext hiveContext = getHiveContext(); + Set<String> processedNames = new HashSet<>(); + + ret = new AtlasEntitiesWithExtInfo(); + + if (hiveContext.getInputs() != null) { + for (ReadEntity input : hiveContext.getInputs()) { + String qualifiedName = getQualifiedName(input); + + if (qualifiedName == null || !processedNames.add(qualifiedName)) { + continue; + } + + AtlasEntity entity = getInputOutputEntity(input, ret); + + if (entity != null) { + inputs.add(entity); + } + } + } + + if (hiveContext.getOutputs() != null) { + for (WriteEntity output : hiveContext.getOutputs()) { + String qualifiedName = getQualifiedName(output); + + if (qualifiedName == null || !processedNames.add(qualifiedName)) { + continue; + } + + AtlasEntity entity = getInputOutputEntity(output, ret); + + if (entity != null) { + outputs.add(entity); + } + } + } + + if (!inputs.isEmpty() || !outputs.isEmpty()) { + AtlasEntity process = getHiveProcessEntity(inputs, outputs); + + ret.addEntity(process); + + processColumnLineage(process, ret); + + addProcessedEntities(ret); + } else { + ret = null; + } + } + + return ret; + } + + private void processColumnLineage(AtlasEntity hiveProcess, AtlasEntitiesWithExtInfo entities) { + LineageInfo lineageInfo = getHiveContext().getLinfo(); + + if (lineageInfo == null || CollectionUtils.isEmpty(lineageInfo.entrySet())) { + return; + } + + for (Map.Entry<DependencyKey, Dependency> entry : lineageInfo.entrySet()) { + String outputColName = getQualifiedName(entry.getKey()); + AtlasEntity outputColumn = context.getEntity(outputColName); + + if (outputColumn == null) { + LOG.warn("column-lineage: non-existing output-column {}", outputColName); + + continue; + } + + List<AtlasEntity> inputColumns = new ArrayList<>(); + + for (BaseColumnInfo baseColumn : entry.getValue().getBaseCols()) { + String inputColName = getQualifiedName(baseColumn); + AtlasEntity inputColumn = context.getEntity(inputColName); + + if (inputColumn == null) { + LOG.warn("column-lineage: non-existing input-column {} for output-column={}", inputColName, outputColName); + + continue; + } + + inputColumns.add(inputColumn); + } + + if (inputColumns.isEmpty()) { + continue; + } + + AtlasEntity columnLineageProcess = new AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE); + + columnLineageProcess.setAttribute(ATTRIBUTE_NAME, hiveProcess.getAttribute(ATTRIBUTE_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); + columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + ":" + outputColumn.getAttribute(ATTRIBUTE_NAME)); + columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputColumns)); + columnLineageProcess.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(getObjectId(outputColumn))); + columnLineageProcess.setAttribute(ATTRIBUTE_QUERY, getObjectId(hiveProcess)); + columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, entry.getValue().getType()); + columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, entry.getValue().getExpr()); + + entities.addEntity(columnLineageProcess); + } + } + + private boolean skipProcess() { + Set<ReadEntity> inputs = getHiveContext().getInputs(); + Set<WriteEntity> outputs = getHiveContext().getOutputs(); + + boolean ret = CollectionUtils.isEmpty(inputs) && CollectionUtils.isEmpty(outputs); + + if (!ret) { + if (getContext().getHiveOperation() == HiveOperation.QUERY) { + // Select query has only one output + if (outputs.size() == 1) { + WriteEntity output = outputs.iterator().next(); + + if (output.getType() == Entity.Type.DFS_DIR || output.getType() == Entity.Type.LOCAL_DIR) { + if (output.getWriteType() == WriteEntity.WriteType.PATH_WRITE && output.isTempURI()) { + ret = true; + } + } + + } + } + } + + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java new file mode 100644 index 0000000..142f591 --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hive.hook.events; + +import org.apache.atlas.hive.hook.AtlasHiveHookContext; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequestV2; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.hooks.Entity; +import org.apache.hadoop.hive.ql.metadata.Table; + +import java.util.Collections; +import java.util.List; + +public class CreateTable extends BaseHiveEvent { + private final boolean skipTempTables; + + public CreateTable(AtlasHiveHookContext context, boolean skipTempTables) { + super(context); + + this.skipTempTables = skipTempTables; + } + + @Override + public List<HookNotificationMessage> getNotificationMessages() throws Exception { + AtlasEntitiesWithExtInfo entities = getEntities(); + HookNotificationMessage notification = new EntityCreateRequestV2(getUserName(), entities); + List<HookNotificationMessage> ret = Collections.singletonList(notification); + + return ret; + } + + public AtlasEntitiesWithExtInfo getEntities() throws Exception { + AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo(); + Database db = null; + Table table = null; + + for (Entity entity : getHiveContext().getOutputs()) { + if (entity.getType() == Entity.Type.TABLE) { + table = entity.getTable(); + + if (table != null) { + db = getHive().getDatabase(table.getDbName()); + table = getHive().getTable(table.getDbName(), table.getTableName()); + + if (table != null) { + // If its an external table, even though the temp table skip flag is on, we create the table since we need the HDFS path to temp table lineage. + if (skipTempTables && table.isTemporary() && !TableType.EXTERNAL_TABLE.equals(table.getTableType())) { + table = null; + } else { + break; + } + } + } + } + } + + if (table != null) { + AtlasEntity tblEntity = toTableEntity(table, ret); + + if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) { + AtlasEntity hdfsPathEntity = getHDFSPathEntity(table.getDataLocation()); + AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity)); + + ret.addEntity(processEntity); + ret.addReferredEntity(hdfsPathEntity); + } + } + + addProcessedEntities(ret); + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropDatabase.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropDatabase.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropDatabase.java new file mode 100644 index 0000000..33a998c --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropDatabase.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hive.hook.events; + +import org.apache.atlas.hive.hook.AtlasHiveHookContext; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequestV2; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.hadoop.hive.ql.hooks.Entity; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class DropDatabase extends BaseHiveEvent { + public DropDatabase(AtlasHiveHookContext context) { + super(context); + } + + @Override + public List<HookNotificationMessage> getNotificationMessages() throws Exception { + List<AtlasObjectId> entities = getEntities(); + HookNotificationMessage notification = new EntityDeleteRequestV2(getUserName(), entities); + List<HookNotificationMessage> ret = Collections.singletonList(notification); + + return ret; + } + + public List<AtlasObjectId> getEntities() throws Exception { + List<AtlasObjectId> ret = new ArrayList<>(); + + for (Entity entity : getHiveContext().getOutputs()) { + if (entity.getType() == Entity.Type.DATABASE) { + String dbQName = getQualifiedName(entity.getDatabase()); + AtlasObjectId dbId = new AtlasObjectId(HIVE_TYPE_DB, ATTRIBUTE_QUALIFIED_NAME, dbQName); + + context.removeFromKnownDatabase(dbQName); + + ret.add(dbId); + } else if (entity.getType() == Entity.Type.TABLE) { + String tblQName = getQualifiedName(entity.getTable()); + AtlasObjectId dbId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName); + + context.removeFromKnownTable(tblQName); + + ret.add(dbId); + } + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropTable.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropTable.java new file mode 100644 index 0000000..6862f53 --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropTable.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hive.hook.events; + +import org.apache.atlas.hive.hook.AtlasHiveHookContext; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequestV2; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.hadoop.hive.ql.hooks.Entity; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class DropTable extends BaseHiveEvent { + public DropTable(AtlasHiveHookContext context) { + super(context); + } + + @Override + public List<HookNotificationMessage> getNotificationMessages() throws Exception { + List<AtlasObjectId> entities = getEntities(); + HookNotificationMessage notification = new EntityDeleteRequestV2(getUserName(), entities); + List<HookNotificationMessage> ret = Collections.singletonList(notification); + + return ret; + } + + public List<AtlasObjectId> getEntities() throws Exception { + List<AtlasObjectId> ret = new ArrayList<>(); + + for (Entity entity : getHiveContext().getOutputs()) { + if (entity.getType() == Entity.Type.TABLE) { + String tblQName = getQualifiedName(entity.getTable()); + AtlasObjectId dbId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName); + + context.removeFromKnownTable(tblQName); + + ret.add(dbId); + } + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/ASTRewriter.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/ASTRewriter.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/ASTRewriter.java deleted file mode 100644 index 3a2506b..0000000 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/ASTRewriter.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.hive.rewrite; - - -import org.apache.hadoop.hive.ql.parse.ASTNode; - -public interface ASTRewriter { - - void rewrite(RewriteContext ctx, ASTNode node) throws RewriteException; -} http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/HiveASTRewriter.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/HiveASTRewriter.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/HiveASTRewriter.java deleted file mode 100644 index 4cd219f..0000000 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/HiveASTRewriter.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.hive.rewrite; - - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.parse.ASTNode; -import org.apache.hadoop.hive.ql.parse.ParseDriver; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.parse.ParseException; -import org.apache.hadoop.hive.ql.parse.ParseUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -public class HiveASTRewriter { - - private Context queryContext; - private RewriteContext rwCtx; - private List<ASTRewriter> rewriters = new ArrayList<>(); - - private static final Logger LOG = LoggerFactory.getLogger(HiveASTRewriter.class); - - public HiveASTRewriter(HiveConf conf) throws RewriteException { - try { - queryContext = new Context(conf); - setUpRewriters(); - } catch (IOException e) { - throw new RewriteException("Exception while rewriting query : " , e); - } - } - - private void setUpRewriters() throws RewriteException { - ASTRewriter rewriter = new LiteralRewriter(); - rewriters.add(rewriter); - } - - public String rewrite(String sourceQry) throws RewriteException { - String result = sourceQry; - ASTNode tree = null; - try { - ParseDriver pd = new ParseDriver(); - tree = pd.parse(sourceQry, queryContext, true); - tree = ParseUtils.findRootNonNullToken(tree); - this.rwCtx = new RewriteContext(sourceQry, tree, queryContext.getTokenRewriteStream()); - rewrite(tree); - result = toSQL(); - } catch (ParseException e) { - LOG.error("Could not parse the query {} ", sourceQry, e); - throw new RewriteException("Could not parse query : " , e); - } - return result; - } - - private void rewrite(ASTNode origin) throws RewriteException { - ASTNode node = origin; - if (node != null) { - for(ASTRewriter rewriter : rewriters) { - rewriter.rewrite(rwCtx, node); - } - if (node.getChildren() != null) { - for (int i = 0; i < node.getChildren().size(); i++) { - rewrite((ASTNode) node.getChild(i)); - } - } - } - } - - public String toSQL() { - return rwCtx.getTokenRewriteStream().toString(); - } - - public String printAST() { - return rwCtx.getOriginNode().dump(); - } - -} http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/LiteralRewriter.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/LiteralRewriter.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/LiteralRewriter.java deleted file mode 100644 index 789b981..0000000 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/LiteralRewriter.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.hive.rewrite; - -import org.apache.hadoop.hive.ql.parse.ASTNode; -import org.apache.hadoop.hive.ql.parse.HiveParser; - -import java.util.HashMap; -import java.util.Map; - -public class LiteralRewriter implements ASTRewriter { - - public static Map<Integer, String> LITERAL_TOKENS = new HashMap<Integer, String>() {{ - put(HiveParser.Number, "NUMBER_LITERAL"); - put(HiveParser.Digit, "DIGIT_LITERAL"); - put(HiveParser.HexDigit, "HEX_LITERAL"); - put(HiveParser.Exponent, "EXPONENT_LITERAL"); - put(HiveParser.StringLiteral, "'STRING_LITERAL'"); - put(HiveParser.BigintLiteral, "BIGINT_LITERAL"); - put(HiveParser.SmallintLiteral, "SMALLINT_LITERAL"); - put(HiveParser.TinyintLiteral, "TINYINT_LITERAL"); - put(HiveParser.DecimalLiteral, "DECIMAL_LITERAL"); - put(HiveParser.ByteLengthLiteral, "BYTE_LENGTH_LITERAL"); - put(HiveParser.TOK_STRINGLITERALSEQUENCE, "'STRING_LITERAL_SEQ'"); - put(HiveParser.TOK_CHARSETLITERAL, "'CHARSET_LITERAL'"); - put(HiveParser.KW_TRUE, "BOOLEAN_LITERAL"); - put(HiveParser.KW_FALSE, "BOOLEAN_LITERAL"); - }}; - - - @Override - public void rewrite(RewriteContext ctx, final ASTNode node) throws RewriteException { - try { - processLiterals(ctx, node); - } catch(Exception e) { - throw new RewriteException("Could not normalize query", e); - } - } - - - private void processLiterals(final RewriteContext ctx, final ASTNode node) { - // Take child ident.totext - if (isLiteral(node)) { - replaceLiteral(ctx, node); - } - } - - private boolean isLiteral(ASTNode node) { - if (LITERAL_TOKENS.containsKey(node.getType())) { - return true; - } - return false; - } - - void replaceLiteral(RewriteContext ctx, ASTNode valueNode) { - //Reset the token stream - String literalVal = LITERAL_TOKENS.get(valueNode.getType()); - ctx.getTokenRewriteStream().replace(valueNode.getTokenStartIndex(), - valueNode.getTokenStopIndex(), literalVal); - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteContext.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteContext.java deleted file mode 100644 index 505616e..0000000 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteContext.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.hive.rewrite; - -import org.antlr.runtime.TokenRewriteStream; -import org.apache.hadoop.hive.ql.parse.ASTNode; - -public class RewriteContext { - - private String origQuery; - - private TokenRewriteStream rewriteStream; - - private ASTNode origin; - - RewriteContext(String origQuery, ASTNode origin, TokenRewriteStream rewriteStream) { - this.origin = origin; - this.rewriteStream = rewriteStream; - } - - public TokenRewriteStream getTokenRewriteStream() { - return rewriteStream; - } - - public ASTNode getOriginNode() { - return origin; - } - - public String getOriginalQuery() { - return origQuery; - } - -} http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteException.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteException.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteException.java deleted file mode 100644 index c87bf6b..0000000 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteException.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.hive.rewrite; - -public class RewriteException extends Exception { - public RewriteException(final String message, final Exception exception) { - super(message, exception); - } -}
