This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-0.8 by this push:
new 1097d97 ATLAS-3938 : Deletion of non existing hive entities
1097d97 is described below
commit 1097d97c7e70f304ca1fb9bb9e9480a657ab5de4
Author: Pinal <pinal-shah>
AuthorDate: Fri Sep 11 17:37:22 2020 +0530
ATLAS-3938 : Deletion of non existing hive entities
Signed-off-by: nixonrodrigues <[email protected]>
---
.../atlas/hive/bridge/HiveMetaStoreBridge.java | 243 ++++++++++++++++++++-
.../main/java/org/apache/atlas/AtlasClientV2.java | 32 ++-
2 files changed, 263 insertions(+), 12 deletions(-)
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index f18d01b..e7caf91 100755
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -26,6 +26,8 @@ import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hive.hook.events.BaseHiveEvent;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHookException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations;
@@ -56,6 +58,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
@@ -89,6 +92,8 @@ public class HiveMetaStoreBridge {
public static final String SEP = ":".intern();
public static final String HDFS_PATH = "hdfs_path";
public static final String DB = "db";
+ public static final String HIVE_TABLE_DB_EDGE_LABEL =
"__hive_table.db";
+ public static final String HOOK_HIVE_PAGE_LIMIT = CONF_PREFIX +
"page.limit";
private static final int EXIT_CODE_SUCCESS = 0;
private static final int EXIT_CODE_FAILED = 1;
@@ -98,6 +103,8 @@ public class HiveMetaStoreBridge {
private final Hive hiveClient;
private final AtlasClientV2 atlasClientV2;
private final boolean convertHdfsPathToLowerCase;
+ private final Configuration atlasConf;
+ private final int pageLimit;
public static void main(String[] args) {
@@ -110,9 +117,13 @@ public class HiveMetaStoreBridge {
options.addOption("t", "table", true, "Table name");
options.addOption("f", "filename", true, "Filename");
options.addOption("failOnError", false, "failOnError");
+ options.addOption("deleteNonExisting", false, "Delete database and
table entities in Atlas if not present in Hive");
+
+ CommandLine cmd = new BasicParser().parse(options,
args);
+ boolean failOnError = cmd.hasOption("failOnError");
+ boolean deleteNonExisting =
cmd.hasOption("deleteNonExisting");
+ LOG.info("delete non existing flag : {} ", deleteNonExisting);
- CommandLine cmd = new BasicParser().parse(options,
args);
- boolean failOnError = cmd.hasOption("failOnError");
String databaseToImport = cmd.getOptionValue("d");
String tableToImport = cmd.getOptionValue("t");
String fileToImport = cmd.getOptionValue("f");
@@ -136,7 +147,9 @@ public class HiveMetaStoreBridge {
HiveMetaStoreBridge hiveMetaStoreBridge = new
HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2);
- if (StringUtils.isNotEmpty(fileToImport)) {
+ if (deleteNonExisting) {
+
hiveMetaStoreBridge.deleteEntitiesForNonExistingHiveMetadata(failOnError);
+ } else if (StringUtils.isNotEmpty(fileToImport)) {
File f = new File(fileToImport);
if (f.exists() && f.canRead()) {
@@ -200,6 +213,8 @@ public class HiveMetaStoreBridge {
System.out.println(" database1:tbl1");
System.out.println(" database1:tbl2");
System.out.println(" database2:tbl2");
+ System.out.println("Usage 5: import-hive.sh [-deleteNonExisting] " );
+ System.out.println(" Deletes databases and tables which are not in
Hive ...");
System.out.println();
}
@@ -208,7 +223,7 @@ public class HiveMetaStoreBridge {
* @param hiveConf {@link HiveConf} for Hive component in the cluster
*/
public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf
hiveConf, AtlasClientV2 atlasClientV2) throws Exception {
- this(atlasProperties.getString(HIVE_CLUSTER_NAME,
DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), atlasClientV2,
atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false));
+ this(atlasProperties, atlasProperties.getString(HIVE_CLUSTER_NAME,
DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), atlasClientV2,
atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false));
}
/**
@@ -220,14 +235,20 @@ public class HiveMetaStoreBridge {
}
HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClientV2
atlasClientV2) {
- this(clusterName, hiveClient, atlasClientV2, true);
+ this(null, clusterName, hiveClient, atlasClientV2, true);
}
- HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClientV2
atlasClientV2, boolean convertHdfsPathToLowerCase) {
+ HiveMetaStoreBridge(Configuration atlasConf, String clusterName, Hive
hiveClient, AtlasClientV2 atlasClientV2, boolean convertHdfsPathToLowerCase) {
this.clusterName = clusterName;
this.hiveClient = hiveClient;
this.atlasClientV2 = atlasClientV2;
this.convertHdfsPathToLowerCase = convertHdfsPathToLowerCase;
+ this.atlasConf = atlasConf;
+ if (atlasConf != null) {
+ pageLimit = atlasConf.getInteger(HOOK_HIVE_PAGE_LIMIT, 10000);
+ } else {
+ pageLimit = 10000;
+ }
}
public String getClusterName() {
@@ -877,4 +898,214 @@ public class HiveMetaStoreBridge {
public static long getTableCreatedTime(Table table) {
return table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR;
}
+
+ private List<AtlasEntityHeader> getAllDatabaseInCluster() throws
AtlasServiceException {
+
+ List<AtlasEntityHeader> entities = new ArrayList<>();
+ final int pageSize = pageLimit;
+
+ SearchParameters.FilterCriteria fc = new
SearchParameters.FilterCriteria();
+ fc.setAttributeName(ATTRIBUTE_CLUSTER_NAME);
+ fc.setAttributeValue(clusterName);
+ fc.setOperator(SearchParameters.Operator.EQ);
+
+ for (int i = 0; ; i++) {
+ int offset = pageSize * i;
+ LOG.info("Retrieving databases: offset={}, pageSize={}", offset,
pageSize);
+
+ AtlasSearchResult searchResult =
atlasClientV2.basicSearch(HIVE_TYPE_DB, fc,null, null, true, pageSize, offset);
+
+ List<AtlasEntityHeader> entityHeaders = searchResult == null ?
null : searchResult.getEntities();
+ int dbCount = entityHeaders == null ? 0
: entityHeaders.size();
+
+ LOG.info("Retrieved {} databases of {} cluster", dbCount,
clusterName);
+
+ if (dbCount > 0) {
+ entities.addAll(entityHeaders);
+ }
+
+ if (dbCount < pageSize) { // last page
+ break;
+ }
+ }
+
+ return entities;
+ }
+
+ private List<AtlasEntityHeader> getAllTablesInDb(String databaseGuid)
throws AtlasServiceException {
+
+ List<AtlasEntityHeader> entities = new ArrayList<>();
+ final int pageSize = pageLimit;
+
+ for (int i = 0; ; i++) {
+ int offset = pageSize * i;
+ LOG.info("Retrieving tables: offset={}, pageSize={}", offset,
pageSize);
+
+ AtlasSearchResult searchResult =
atlasClientV2.getRelationship(databaseGuid, HIVE_TABLE_DB_EDGE_LABEL, true,
pageSize, offset);
+
+ List<AtlasEntityHeader> entityHeaders = searchResult == null ?
null : searchResult.getEntities();
+ int tableCount = entityHeaders == null ? 0
: entityHeaders.size();
+
+ LOG.info("Retrieved {} tables of {} database", tableCount,
databaseGuid);
+
+ if (tableCount > 0) {
+ entities.addAll(entityHeaders);
+ }
+
+ if (tableCount < pageSize) { // last page
+ break;
+ }
+ }
+
+ return entities;
+ }
+
+ public String getHiveDatabaseName(String qualifiedName) {
+
+ if (StringUtils.isNotEmpty(qualifiedName)) {
+ String[] split = qualifiedName.split("@");
+ if (split.length > 0) {
+ return split[0];
+ }
+ }
+ return null;
+ }
+
+
+ public String getHiveTableName(String qualifiedName, boolean isTemporary) {
+
+ if (StringUtils.isNotEmpty(qualifiedName)) {
+ String tableName = StringUtils.substringBetween(qualifiedName,
".", "@");
+ if (!isTemporary) {
+ return tableName;
+ } else {
+ if (StringUtils.isNotEmpty(tableName)) {
+ String[] splitTemp = tableName.split(TEMP_TABLE_PREFIX);
+ if (splitTemp.length > 0) {
+ return splitTemp[0];
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ private void deleteByGuid(List<String> guidTodelete) throws
AtlasServiceException {
+
+ if (CollectionUtils.isNotEmpty(guidTodelete)) {
+
+ for (String guid : guidTodelete) {
+ EntityMutationResponse response =
atlasClientV2.deleteEntityByGuid(guid);
+
+ if (response.getDeletedEntities().size() < 1) {
+ LOG.info("Entity with guid : {} is not deleted", guid);
+ } else {
+ LOG.info("Entity with guid : {} is deleted", guid);
+ }
+ }
+ } else {
+ LOG.info("No Entity to delete from Atlas");
+ }
+ }
+
+ public void deleteEntitiesForNonExistingHiveMetadata(boolean failOnError)
throws Exception {
+
+ //fetch databases from Atlas
+ List<AtlasEntityHeader> dbs = null;
+ try {
+ dbs = getAllDatabaseInCluster();
+ LOG.info("Total Databases in cluster {} : {} ", clusterName,
dbs.size());
+ } catch (AtlasServiceException e) {
+ LOG.error("Failed to retrieve database entities for cluster {}
from Atlas", clusterName, e);
+ if (failOnError) {
+ throw e;
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(dbs)) {
+ //iterate all dbs to check if exists in hive
+ for (AtlasEntityHeader db : dbs) {
+
+ String dbGuid = db.getGuid();
+ String hiveDbName = getHiveDatabaseName((String)
db.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+
+ if (StringUtils.isEmpty(hiveDbName)) {
+ LOG.error("Failed to get database from qualifiedName: {},
guid: {} ", db.getAttribute(ATTRIBUTE_QUALIFIED_NAME), dbGuid);
+ continue;
+ }
+
+ List<AtlasEntityHeader> tables;
+ try {
+ tables = getAllTablesInDb(dbGuid);
+ LOG.info("Total Tables in database {} : {} ", hiveDbName,
tables.size());
+ } catch (AtlasServiceException e) {
+ LOG.error("Failed to retrieve table entities for database
{} from Atlas", hiveDbName, e);
+ if (failOnError) {
+ throw e;
+ }
+ continue;
+ }
+
+ List<String> guidsToDelete = new ArrayList<>();
+ if (!hiveClient.databaseExists(hiveDbName)) {
+
+ //table guids
+ if (CollectionUtils.isNotEmpty(tables)) {
+ for (AtlasEntityHeader table : tables) {
+ guidsToDelete.add(table.getGuid());
+ }
+ }
+
+ //db guid
+ guidsToDelete.add(db.getGuid());
+ LOG.info("Added database {}.{} and its {} tables to
delete", clusterName, hiveDbName, tables.size());
+
+ } else {
+ //iterate all table of db to check if it exists
+ if (CollectionUtils.isNotEmpty(tables)) {
+ for (AtlasEntityHeader table : tables) {
+ String hiveTableName = getHiveTableName((String)
table.getAttribute(ATTRIBUTE_QUALIFIED_NAME), true);
+
+ if (StringUtils.isEmpty(hiveTableName)) {
+ LOG.error("Failed to get table from
qualifiedName: {}, guid: {} ", table.getAttribute(ATTRIBUTE_QUALIFIED_NAME),
table.getGuid());
+ continue;
+ }
+
+ try {
+ hiveClient.getTable(hiveDbName, hiveTableName,
true);
+ } catch (InvalidTableException e) { //table
doesn't exists
+ LOG.info("Added table {}.{} to delete",
hiveDbName, hiveTableName);
+
+ guidsToDelete.add(table.getGuid());
+ } catch (HiveException e) {
+ LOG.error("Failed to get table {}.{} from
Hive", hiveDbName, hiveTableName, e);
+
+ if (failOnError) {
+ throw e;
+ }
+ }
+ }
+ }
+ }
+
+ //delete entities
+ if (CollectionUtils.isNotEmpty(guidsToDelete)) {
+ try {
+ deleteByGuid(guidsToDelete);
+ } catch (AtlasServiceException e) {
+ LOG.error("Failed to delete Atlas entities for
database {}", hiveDbName, e);
+
+ if (failOnError) {
+ throw e;
+ }
+ }
+
+ }
+ }
+
+ } else {
+ LOG.info("No database found in service.");
+ }
+
+ }
}
diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 6968e83..da98a7e 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -67,6 +67,7 @@ public class AtlasClientV2 extends AtlasBaseClient {
private static final String DSL_URI = DISCOVERY_URI + "/dsl";
private static final String FULL_TEXT_URI = DISCOVERY_URI +
"/fulltext";
private static final String BASIC_SEARCH_URI = DISCOVERY_URI + "/basic";
+ private static final String RELATIONSHIP_URI = DISCOVERY_URI +
"/relationship";
private static final String FACETED_SEARCH_URI = BASIC_SEARCH_URI;
public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword)
{
@@ -362,17 +363,35 @@ public class AtlasClientV2 extends AtlasBaseClient {
return callAPI(API_V2.FULL_TEXT_SEARCH, AtlasSearchResult.class,
queryParams);
}
- public AtlasSearchResult basicSearch(final String typeName, final String
classification, final String query,
+ public AtlasSearchResult basicSearch(String typeName, String
classification, String query, boolean excludeDeletedEntities, int limit, int
offset) throws AtlasServiceException {
+ return this.basicSearch(typeName, null, classification, query,
excludeDeletedEntities, limit, offset);
+ }
+
+ public AtlasSearchResult basicSearch(final String typeName, final
SearchParameters.FilterCriteria entityFilters, final String classification,
final String query,
final boolean excludeDeletedEntities,
final int limit, final int offset) throws AtlasServiceException {
+ SearchParameters parameters = new SearchParameters();
+ parameters.setTypeName(typeName);
+ parameters.setClassification(classification);
+ parameters.setQuery(query);
+ parameters.setExcludeDeletedEntities(excludeDeletedEntities);
+ parameters.setLimit(limit);
+ parameters.setOffset(offset);
+ if (entityFilters != null){
+ parameters.setEntityFilters(entityFilters);
+ }
+ return callAPI(API_V2.BASIC_SEARCH, AtlasSearchResult.class,
parameters);
+ }
+
+ public AtlasSearchResult getRelationship(final String guid, String
relation, final boolean excludeDeletedEntities,
+ final int limit, final int offset )
throws AtlasServiceException {
MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
- queryParams.add("typeName", typeName);
- queryParams.add("classification", classification);
- queryParams.add(QUERY, query);
+ queryParams.add("guid", guid);
+ queryParams.add("relation", relation);
queryParams.add("excludeDeletedEntities",
String.valueOf(excludeDeletedEntities));
queryParams.add(LIMIT, String.valueOf(limit));
queryParams.add(OFFSET, String.valueOf(offset));
- return callAPI(API_V2.BASIC_SEARCH, AtlasSearchResult.class,
queryParams);
+ return callAPI(API_V2.GET_RELATIONSHIP, AtlasSearchResult.class,
queryParams);
}
public AtlasSearchResult facetedSearch(SearchParameters searchParameters)
throws AtlasServiceException {
@@ -454,8 +473,9 @@ public class AtlasClientV2 extends AtlasBaseClient {
public static final API_V2 LINEAGE_INFO = new
API_V2(LINEAGE_URI, HttpMethod.GET, Response.Status.OK);
public static final API_V2 DSL_SEARCH = new
API_V2(DSL_URI, HttpMethod.GET, Response.Status.OK);
public static final API_V2 FULL_TEXT_SEARCH = new
API_V2(FULL_TEXT_URI, HttpMethod.GET, Response.Status.OK);
- public static final API_V2 BASIC_SEARCH = new
API_V2(BASIC_SEARCH_URI, HttpMethod.GET, Response.Status.OK);
+ public static final API_V2 BASIC_SEARCH = new
API_V2(BASIC_SEARCH_URI, HttpMethod.POST, Response.Status.OK);
public static final API_V2 FACETED_SEARCH = new
API_V2(FACETED_SEARCH_URI, HttpMethod.POST, Response.Status.OK);
+ public static final API_V2 GET_RELATIONSHIP = new
API_V2(RELATIONSHIP_URI, HttpMethod.GET, Response.Status.OK);
private API_V2(String path, String method, Response.Status status) {
super(path, method, status);