Repository: incubator-atlas Updated Branches: refs/heads/master 7b07a222c -> 8c4a7faef
ATLAS-415 Hive import fails when importing a table that is already imported without StorageDescriptor information (yhemanth via shwethags) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/8c4a7fae Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/8c4a7fae Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/8c4a7fae Branch: refs/heads/master Commit: 8c4a7faef286a5ae5cf6f986f6796da60f0251ab Parents: 7b07a22 Author: Shwetha GS <[email protected]> Authored: Fri Jan 29 11:35:29 2016 +0530 Committer: Shwetha GS <[email protected]> Committed: Fri Jan 29 11:35:29 2016 +0530 ---------------------------------------------------------------------- addons/hive-bridge/pom.xml | 5 + .../atlas/hive/bridge/HiveMetaStoreBridge.java | 243 +++++++++++++------ .../hive/bridge/HiveMetaStoreBridgeTest.java | 206 ++++++++++++++++ pom.xml | 2 +- release-log.txt | 1 + 5 files changed, 388 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8c4a7fae/addons/hive-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml index 20c25d1..f1cb130 100755 --- a/addons/hive-bridge/pom.xml +++ b/addons/hive-bridge/pom.xml @@ -127,6 +127,11 @@ </dependency> <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </dependency> + + <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8c4a7fae/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index 40babe5..4680a3c 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -52,12 +52,16 @@ import java.util.List; /** * A Bridge Utility that imports metadata from the Hive Meta Store - * and registers then in Atlas. + * and registers them in Atlas. */ public class HiveMetaStoreBridge { private static final String DEFAULT_DGI_URL = "http://localhost:21000/"; public static final String HIVE_CLUSTER_NAME = "atlas.cluster.name"; public static final String DEFAULT_CLUSTER_NAME = "primary"; + public static final String DESCRIPTION_ATTR = "description"; + public static final String TABLE_TYPE_ATTR = "tableType"; + public static final String SEARCH_ENTRY_GUID_ATTR = "__guid"; + public static final String LAST_ACCESS_TIME_ATTR = "lastAccessTime"; private final String clusterName; public static final String ATLAS_ENDPOINT = "atlas.rest.address"; @@ -67,6 +71,12 @@ public class HiveMetaStoreBridge { public final Hive hiveClient; private final AtlasClient atlasClient; + /** + * Construct a HiveMetaStoreBridge. + * @param hiveConf {@link HiveConf} for Hive component in the cluster + * @param atlasConf {@link Configuration} for Atlas component in the cluster + * @throws Exception + */ public HiveMetaStoreBridge(HiveConf hiveConf, Configuration atlasConf) throws Exception { this(hiveConf, atlasConf, null, null); } @@ -77,21 +87,28 @@ public class HiveMetaStoreBridge { /** * Construct a HiveMetaStoreBridge. - * @param hiveConf hive conf + * @param hiveConf {@link HiveConf} for Hive component in the cluster + * @param doAsUser The user accessing Atlas service + * @param ugi {@link UserGroupInformation} representing the Atlas service */ public HiveMetaStoreBridge(HiveConf hiveConf, Configuration atlasConf, String doAsUser, UserGroupInformation ugi) throws Exception { - clusterName = hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); - hiveClient = Hive.get(hiveConf); + this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), + Hive.get(hiveConf), + new AtlasClient(atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL), ugi, doAsUser)); + } - atlasClient = new AtlasClient(atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL), ugi, doAsUser); + HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) { + this.clusterName = clusterName; + this.hiveClient = hiveClient; + this.atlasClient = atlasClient; } - public AtlasClient getAtlasClient() { + private AtlasClient getAtlasClient() { return atlasClient; } - public void importHiveMetadata() throws Exception { + void importHiveMetadata() throws Exception { LOG.info("Importing hive metadata"); importDatabases(); } @@ -106,27 +123,13 @@ public class HiveMetaStoreBridge { } /** - * Creates db entity - * @param hiveDB - * @return + * Create a Hive Database entity + * @param hiveDB The Hive {@link Database} object from which to map properties + * @return new Hive Database entity * @throws HiveException */ public Referenceable createDBInstance(Database hiveDB) throws HiveException { - LOG.info("Importing objects from databaseName : " + hiveDB.getName()); - - Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); - String dbName = hiveDB.getName().toLowerCase(); - dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getDBQualifiedName(clusterName, dbName)); - dbRef.set(HiveDataModelGenerator.NAME, dbName); - dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName); - dbRef.set("description", hiveDB.getDescription()); - dbRef.set("locationUri", hiveDB.getLocationUri()); - dbRef.set("parameters", hiveDB.getParameters()); - dbRef.set("ownerName", hiveDB.getOwnerName()); - if (hiveDB.getOwnerType() != null) { - dbRef.set("ownerType", hiveDB.getOwnerType().getValue()); - } - return dbRef; + return createOrUpdateDBInstance(hiveDB, null); } /** @@ -137,12 +140,34 @@ public class HiveMetaStoreBridge { */ private Referenceable registerDatabase(String databaseName) throws Exception { Referenceable dbRef = getDatabaseReference(clusterName, databaseName); + Database db = hiveClient.getDatabase(databaseName); if (dbRef == null) { - Database db = hiveClient.getDatabase(databaseName); dbRef = createDBInstance(db); dbRef = registerInstance(dbRef); } else { - LOG.info("Database {} is already registered with id {}", databaseName, dbRef.getId().id); + LOG.info("Database {} is already registered with id {}. Updating it.", databaseName, dbRef.getId().id); + dbRef = createOrUpdateDBInstance(db, dbRef); + updateInstance(dbRef); + } + return dbRef; + } + + private Referenceable createOrUpdateDBInstance(Database hiveDB, Referenceable dbRef) { + LOG.info("Importing objects from databaseName : " + hiveDB.getName()); + + if (dbRef == null) { + dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); + } + String dbName = hiveDB.getName().toLowerCase(); + dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getDBQualifiedName(clusterName, dbName)); + dbRef.set(HiveDataModelGenerator.NAME, dbName); + dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName); + dbRef.set(DESCRIPTION_ATTR, hiveDB.getDescription()); + dbRef.set("locationUri", hiveDB.getLocationUri()); + dbRef.set("parameters", hiveDB.getParameters()); + dbRef.set("ownerName", hiveDB.getOwnerName()); + if (hiveDB.getOwnerType() != null) { + dbRef.set("ownerType", hiveDB.getOwnerType().getValue()); } return dbRef; } @@ -153,7 +178,7 @@ public class HiveMetaStoreBridge { * @return * @throws Exception */ - public Referenceable registerInstance(Referenceable referenceable) throws Exception { + private Referenceable registerInstance(Referenceable referenceable) throws Exception { String typeName = referenceable.getTypeName(); LOG.debug("creating instance of type " + typeName); @@ -176,11 +201,15 @@ public class HiveMetaStoreBridge { LOG.debug("Getting reference for database {}", databaseName); String typeName = HiveDataTypes.HIVE_DB.getName(); - String dslQuery = String.format("%s where %s = '%s' and %s = '%s'", typeName, HiveDataModelGenerator.NAME, - databaseName.toLowerCase(), HiveDataModelGenerator.CLUSTER_NAME, clusterName); + String dslQuery = getDatabaseDSLQuery(clusterName, databaseName, typeName); return getEntityReferenceFromDSL(typeName, dslQuery); } + static String getDatabaseDSLQuery(String clusterName, String databaseName, String typeName) { + return String.format("%s where %s = '%s' and %s = '%s'", typeName, HiveDataModelGenerator.NAME, + databaseName.toLowerCase(), HiveDataModelGenerator.CLUSTER_NAME, clusterName); + } + private Referenceable getEntityReferenceFromDSL(String typeName, String dslQuery) throws Exception { AtlasClient dgiClient = getAtlasClient(); JSONArray results = dgiClient.searchByDSL(dslQuery); @@ -198,6 +227,12 @@ public class HiveMetaStoreBridge { } } + /** + * Construct the qualified name used to uniquely identify a Database instance in Atlas. + * @param clusterName Name of the cluster to which the Hive component belongs + * @param dbName Name of the Hive database + * @return Unique qualified name to identify the Database instance in Atlas. + */ public static String getDBQualifiedName(String clusterName, String dbName) { return String.format("%s@%s", dbName.toLowerCase(), clusterName); } @@ -233,71 +268,109 @@ public class HiveMetaStoreBridge { LOG.debug("Getting reference for table {}.{}", dbName, tableName); String typeName = HiveDataTypes.HIVE_TABLE.getName(); - String entityName = getTableQualifiedName(clusterName, dbName, tableName); - String dslQuery = String.format("%s as t where name = '%s'", typeName, entityName); + String dslQuery = getTableDSLQuery(getClusterName(), dbName, tableName, typeName); return getEntityReferenceFromDSL(typeName, dslQuery); } + static String getTableDSLQuery(String clusterName, String dbName, String tableName, String typeName) { + String entityName = getTableQualifiedName(clusterName, dbName, tableName); + return String.format("%s as t where name = '%s'", typeName, entityName); + } + + /** + * Construct the qualified name used to uniquely identify a Table instance in Atlas. + * @param clusterName Name of the cluster to which the Hive component belongs + * @param dbName Name of the Hive database to which the Table belongs + * @param tableName Name of the Hive table + * @return Unique qualified name to identify the Table instance in Atlas. + */ public static String getTableQualifiedName(String clusterName, String dbName, String tableName) { return String.format("%s.%s@%s", dbName.toLowerCase(), tableName.toLowerCase(), clusterName); } + /** + * Create a new table instance in Atlas + * @param dbReference reference to a created Hive database {@link Referenceable} to which this table belongs + * @param hiveTable reference to the Hive {@link Table} from which to map properties + * @return Newly created Hive reference + * @throws Exception + */ public Referenceable createTableInstance(Referenceable dbReference, Table hiveTable) throws Exception { + return createOrUpdateTableInstance(dbReference, null, hiveTable); + } + + private Referenceable createOrUpdateTableInstance(Referenceable dbReference, Referenceable tableReference, + Table hiveTable) throws Exception { LOG.info("Importing objects from {}.{}", hiveTable.getDbName(), hiveTable.getTableName()); - Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); + if (tableReference == null) { + tableReference = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); + } String tableQualifiedName = getTableQualifiedName(clusterName, hiveTable.getDbName(), hiveTable.getTableName()); - tableRef.set(HiveDataModelGenerator.NAME, tableQualifiedName); - tableRef.set(HiveDataModelGenerator.TABLE_NAME, hiveTable.getTableName().toLowerCase()); - tableRef.set("owner", hiveTable.getOwner()); + tableReference.set(HiveDataModelGenerator.NAME, tableQualifiedName); + tableReference.set(HiveDataModelGenerator.TABLE_NAME, hiveTable.getTableName().toLowerCase()); + tableReference.set("owner", hiveTable.getOwner()); - tableRef.set("createTime", hiveTable.getMetadata().getProperty(hive_metastoreConstants.DDL_TIME)); - tableRef.set("lastAccessTime", hiveTable.getLastAccessTime()); - tableRef.set("retention", hiveTable.getRetention()); + tableReference.set("createTime", hiveTable.getMetadata().getProperty(hive_metastoreConstants.DDL_TIME)); + tableReference.set("lastAccessTime", hiveTable.getLastAccessTime()); + tableReference.set("retention", hiveTable.getRetention()); - tableRef.set(HiveDataModelGenerator.COMMENT, hiveTable.getParameters().get(HiveDataModelGenerator.COMMENT)); + tableReference.set(HiveDataModelGenerator.COMMENT, hiveTable.getParameters().get(HiveDataModelGenerator.COMMENT)); // add reference to the database - tableRef.set(HiveDataModelGenerator.DB, dbReference); + tableReference.set(HiveDataModelGenerator.DB, dbReference); - tableRef.set("columns", getColumns(hiveTable.getCols(), tableQualifiedName)); + tableReference.set("columns", getColumns(hiveTable.getCols(), tableQualifiedName)); // add reference to the StorageDescriptor Referenceable sdReferenceable = fillStorageDescStruct(hiveTable.getSd(), tableQualifiedName, tableQualifiedName); - tableRef.set("sd", sdReferenceable); + tableReference.set("sd", sdReferenceable); // add reference to the Partition Keys List<Referenceable> partKeys = getColumns(hiveTable.getPartitionKeys(), tableQualifiedName); - tableRef.set("partitionKeys", partKeys); + tableReference.set("partitionKeys", partKeys); - tableRef.set("parameters", hiveTable.getParameters()); + tableReference.set("parameters", hiveTable.getParameters()); if (hiveTable.getViewOriginalText() != null) { - tableRef.set("viewOriginalText", hiveTable.getViewOriginalText()); + tableReference.set("viewOriginalText", hiveTable.getViewOriginalText()); } if (hiveTable.getViewExpandedText() != null) { - tableRef.set("viewExpandedText", hiveTable.getViewExpandedText()); + tableReference.set("viewExpandedText", hiveTable.getViewExpandedText()); } - tableRef.set("tableType", hiveTable.getTableType().name()); - tableRef.set("temporary", hiveTable.isTemporary()); - return tableRef; + tableReference.set(TABLE_TYPE_ATTR, hiveTable.getTableType().name()); + tableReference.set("temporary", hiveTable.isTemporary()); + return tableReference; } private Referenceable registerTable(Referenceable dbReference, Table table) throws Exception { String dbName = table.getDbName(); String tableName = table.getTableName(); LOG.info("Attempting to register table [" + tableName + "]"); - Referenceable tableRef = getTableReference(dbName, tableName); - if (tableRef == null) { - tableRef = createTableInstance(dbReference, table); - tableRef = registerInstance(tableRef); + Referenceable tableReference = getTableReference(dbName, tableName); + if (tableReference == null) { + tableReference = createTableInstance(dbReference, table); + tableReference = registerInstance(tableReference); } else { - LOG.info("Table {}.{} is already registered with id {}", dbName, tableName, tableRef.getId().id); + LOG.info("Table {}.{} is already registered with id {}. Updating entity.", dbName, tableName, + tableReference.getId().id); + tableReference = createOrUpdateTableInstance(dbReference, tableReference, table); + updateInstance(tableReference); } - return tableRef; + return tableReference; + } + + private void updateInstance(Referenceable referenceable) throws AtlasServiceException { + String typeName = referenceable.getTypeName(); + LOG.debug("updating instance of type " + typeName); + + String entityJSON = InstanceSerialization.toJson(referenceable, true); + LOG.debug("Updating entity {} = {}", referenceable.getTypeName(), entityJSON); + + atlasClient.updateEntity(referenceable.getId().id, referenceable); } @@ -308,14 +381,13 @@ public class HiveMetaStoreBridge { if (results.length() == 0) { return null; } - String guid = results.getJSONObject(0).getString("__guid"); + String guid = results.getJSONObject(0).getString(SEARCH_ENTRY_GUID_ATTR); return new Referenceable(guid, typeName, null); } private Referenceable getPartitionReference(String dbName, String tableName, List<String> values) throws Exception { - String valuesStr = "['" + StringUtils.join(values, "', '") + "']"; + String valuesStr = joinPartitionValues(values); LOG.debug("Getting reference for partition for {}.{} with values {}", dbName, tableName, valuesStr); - String typeName = HiveDataTypes.HIVE_PARTITION.getName(); //todo replace gremlin with DSL // String dslQuery = String.format("%s as p where values = %s, tableName where name = '%s', " @@ -323,14 +395,23 @@ public class HiveMetaStoreBridge { // tableName, // dbName, clusterName); - String datasetType = AtlasClient.DATA_SET_SUPER_TYPE; String tableEntityName = getTableQualifiedName(clusterName, dbName, tableName); - String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.values', %s).as('p')." + String gremlinQuery = getPartitionGremlinQuery(valuesStr, tableEntityName); + + return getEntityReferenceFromGremlin(HiveDataTypes.HIVE_PARTITION.getName(), gremlinQuery); + } + + static String joinPartitionValues(List<String> values) { + return "['" + StringUtils.join(values, "', '") + "']"; + } + + static String getPartitionGremlinQuery(String valuesStr, String tableEntityName) { + String typeName = HiveDataTypes.HIVE_PARTITION.getName(); + String datasetType = AtlasClient.DATA_SET_SUPER_TYPE; + return String.format("g.V.has('__typeName', '%s').has('%s.values', %s).as('p')." + "out('__%s.table').has('%s.name', '%s').back('p').toList()", typeName, typeName, valuesStr, typeName, datasetType, tableEntityName); - - return getEntityReferenceFromGremlin(typeName, gremlinQuery); } private Referenceable getSDForTable(String dbName, String tableName) throws Exception { @@ -369,15 +450,22 @@ public class HiveMetaStoreBridge { partRef = createPartitionReferenceable(tableReferenceable, sdReferenceable, hivePart); partRef = registerInstance(partRef); } else { - LOG.info("Partition {}.{} with values {} is already registered with id {}", dbName, tableName, + LOG.info("Partition {}.{} with values {} is already registered with id {}. Updating entity", + dbName, tableName, StringUtils.join(hivePart.getValues(), ","), partRef.getId().id); + partRef = + createOrUpdatePartitionReferenceable(tableReferenceable, sdReferenceable, hivePart, partRef); + updateInstance(partRef); } return partRef; } - public Referenceable createPartitionReferenceable(Referenceable tableReferenceable, Referenceable sdReferenceable, - Partition hivePart) { - Referenceable partRef = new Referenceable(HiveDataTypes.HIVE_PARTITION.getName()); + private Referenceable createOrUpdatePartitionReferenceable(Referenceable tableReferenceable, + Referenceable sdReferenceable, + Partition hivePart, Referenceable partRef) { + if (partRef == null) { + partRef = new Referenceable(HiveDataTypes.HIVE_PARTITION.getName()); + } partRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getPartitionQualifiedName(hivePart)); partRef.set("values", hivePart.getValues()); @@ -385,7 +473,7 @@ public class HiveMetaStoreBridge { //todo fix partRef.set("createTime", hivePart.getLastAccessTime()); - partRef.set("lastAccessTime", hivePart.getLastAccessTime()); + partRef.set(LAST_ACCESS_TIME_ATTR, hivePart.getLastAccessTime()); // sdStruct = fillStorageDescStruct(hivePart.getSd()); // Instead of creating copies of the sdstruct for partitions we are reusing existing @@ -396,6 +484,18 @@ public class HiveMetaStoreBridge { return partRef; } + /** + * Create a Hive partition instance in Atlas + * @param tableReferenceable The Hive Table {@link Referenceable} to which this partition belongs. + * @param sdReferenceable The Storage descriptor {@link Referenceable} for this table. + * @param hivePart The Hive {@link Partition} object being created + * @return Newly created Hive partition instance + */ + public Referenceable createPartitionReferenceable(Referenceable tableReferenceable, Referenceable sdReferenceable, + Partition hivePart) { + return createOrUpdatePartitionReferenceable(tableReferenceable, sdReferenceable, hivePart, null); + } + private String getPartitionQualifiedName(Partition partition) { return String.format("%s.%s.%s@%s", partition.getTable().getDbName(), partition.getTable().getTableName(), StringUtils.join(partition.getValues(), "-"), clusterName); @@ -480,6 +580,13 @@ public class HiveMetaStoreBridge { return colList; } + /** + * Register the Hive DataModel in Atlas, if not already defined. + * + * The method checks for the presence of the type {@link HiveDataTypes#HIVE_PROCESS} with the Atlas server. + * If this type is defined, then we assume the Hive DataModel is registered. + * @throws Exception + */ public synchronized void registerHiveDataModel() throws Exception { HiveDataModelGenerator dataModelGenerator = new HiveDataModelGenerator(); AtlasClient dgiClient = getAtlasClient(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8c4a7fae/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java new file mode 100644 index 0000000..f8cfb71 --- /dev/null +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hive.bridge; + +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.mapred.TextInputFormat; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import scala.actors.threadpool.Arrays; + +import java.util.List; + +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HiveMetaStoreBridgeTest { + + private static final String TEST_DB_NAME = "default"; + public static final String CLUSTER_NAME = "primary"; + public static final String TEST_TABLE_NAME = "test_table"; + + @Mock + private Hive hiveClient; + + @Mock + private AtlasClient atlasClient; + + @BeforeMethod + public void initializeMocks() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testImportThatUpdatesRegisteredDatabase() throws Exception { + // setup database + when(hiveClient.getAllDatabases()).thenReturn(Arrays.asList(new String[]{TEST_DB_NAME})); + String description = "This is a default database"; + when(hiveClient.getDatabase(TEST_DB_NAME)).thenReturn( + new Database(TEST_DB_NAME, description, "/user/hive/default", null)); + when(hiveClient.getAllTables(TEST_DB_NAME)).thenReturn(Arrays.asList(new String[]{})); + + returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); + + HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); + bridge.importHiveMetadata(); + + // verify update is called + verify(atlasClient).updateEntity(eq("72e06b34-9151-4023-aa9d-b82103a50e76"), + (Referenceable) argThat( + new MatchesReferenceableProperty(HiveMetaStoreBridge.DESCRIPTION_ATTR, description))); + } + + @Test + public void testImportThatUpdatesRegisteredTable() throws Exception { + setupDB(hiveClient, TEST_DB_NAME); + + setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME); + + returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); + + // return existing table + when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME, + HiveDataTypes.HIVE_TABLE.getName()))).thenReturn( + getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77")); + when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference()); + + HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); + bridge.importHiveMetadata(); + + // verify update is called on table + verify(atlasClient).updateEntity(eq("82e06b34-9151-4023-aa9d-b82103a50e77"), + (Referenceable) argThat(new MatchesReferenceableProperty(HiveMetaStoreBridge.TABLE_TYPE_ATTR, + TableType.EXTERNAL_TABLE.name()))); + } + + private void returnExistingDatabase(String databaseName, AtlasClient atlasClient, String clusterName) + throws AtlasServiceException, JSONException { + when(atlasClient.searchByDSL(HiveMetaStoreBridge.getDatabaseDSLQuery(clusterName, databaseName, + HiveDataTypes.HIVE_DB.getName()))).thenReturn( + getEntityReference("72e06b34-9151-4023-aa9d-b82103a50e76")); + } + + private Table setupTable(Hive hiveClient, String databaseName, String tableName) throws HiveException { + when(hiveClient.getAllTables(databaseName)).thenReturn(Arrays.asList(new String[]{tableName})); + Table testTable = createTestTable(databaseName, tableName); + when(hiveClient.getTable(databaseName, tableName)).thenReturn(testTable); + return testTable; + } + + private void setupDB(Hive hiveClient, String databaseName) throws HiveException { + when(hiveClient.getAllDatabases()).thenReturn(Arrays.asList(new String[]{databaseName})); + when(hiveClient.getDatabase(databaseName)).thenReturn( + new Database(databaseName, "Default database", "/user/hive/default", null)); + } + + @Test + public void testImportThatUpdatesRegisteredPartition() throws Exception { + setupDB(hiveClient, TEST_DB_NAME); + Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME); + + returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); + + when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME, + TEST_TABLE_NAME, + HiveDataTypes.HIVE_TABLE.getName()))).thenReturn( + getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77")); + when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference()); + + Partition partition = mock(Partition.class); + when(partition.getTable()).thenReturn(hiveTable); + List partitionValues = Arrays.asList(new String[]{"name", "location"}); + when(partition.getValues()).thenReturn(partitionValues); + int lastAccessTime = 1234512345; + when(partition.getLastAccessTime()).thenReturn(lastAccessTime); + + when(hiveClient.getPartitions(hiveTable)).thenReturn(Arrays.asList(new Partition[]{partition})); + + when(atlasClient.searchByGremlin( + HiveMetaStoreBridge.getPartitionGremlinQuery( + HiveMetaStoreBridge.joinPartitionValues(partitionValues), + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME)))). + thenReturn(getPartitionReference("9ae06b34-9151-3043-aa9d-b82103a50e99")); + + HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); + bridge.importHiveMetadata(); + + verify(atlasClient).updateEntity(eq("9ae06b34-9151-3043-aa9d-b82103a50e99"), + (Referenceable) argThat(new MatchesReferenceableProperty(HiveMetaStoreBridge.LAST_ACCESS_TIME_ATTR, + new Integer(lastAccessTime)))); + } + + private JSONArray getPartitionReference(String id) throws JSONException { + JSONObject resultEntry = new JSONObject(); + resultEntry.put(HiveMetaStoreBridge.SEARCH_ENTRY_GUID_ATTR, id); + JSONArray results = new JSONArray(); + results.put(resultEntry); + return results; + } + + private JSONArray getEntityReference(String id) throws JSONException { + return new JSONArray(String.format("[{\"$id$\":{\"id\":\"%s\"}}]", id)); + } + + private Referenceable createTableReference() { + Referenceable tableReference = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); + Referenceable sdReference = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName()); + tableReference.set("sd", sdReference); + return tableReference; + } + + private Table createTestTable(String databaseName, String tableName) throws HiveException { + Table table = new Table(databaseName, tableName); + table.setInputFormatClass(TextInputFormat.class); + table.setTableType(TableType.EXTERNAL_TABLE); + return table; + } + + private class MatchesReferenceableProperty extends ArgumentMatcher<Object> { + private final String attrName; + private final Object attrValue; + + public MatchesReferenceableProperty(String attrName, Object attrValue) { + this.attrName = attrName; + this.attrValue = attrValue; + } + + @Override + public boolean matches(Object o) { + return attrValue.equals(((Referenceable) o).get(attrName)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8c4a7fae/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7470e65..095a8dd 100755 --- a/pom.xml +++ b/pom.xml @@ -1171,7 +1171,7 @@ <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>1.8.5</version> - <scope>provided</scope> + <scope>test</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8c4a7fae/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 82e4357..da1f744 100644 --- a/release-log.txt +++ b/release-log.txt @@ -7,6 +7,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-415 Hive import fails when importing a table that is already imported without StorageDescriptor information (yhemanth via shwethags) ATLAS-450 quick_start fails on cygwin (dkantor via shwethags) ATLAS-451 Doc: Fix few broken links due to Wiki words in Atlas documentation (ssainath via shwethags) ATLAS-439 Investigate apache build failure - EntityJerseyResourceIT.testEntityDeduping (shwethags)
