http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java index f7404ae..1cf6b79 100755 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java @@ -19,18 +19,25 @@ package org.apache.atlas.hive.hook; import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.sun.jersey.api.client.ClientResponse; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.hive.HiveITBase; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; +import org.apache.atlas.hive.hook.events.BaseHiveEvent; import org.apache.atlas.hive.model.HiveDataTypes; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.Struct; -import org.apache.atlas.typesystem.persistence.Id; -import org.apache.atlas.typesystem.types.TypeSystem; -import org.apache.commons.lang.RandomStringUtils; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.lineage.AtlasLineageInfo; +import org.apache.atlas.model.typedef.AtlasClassificationDef; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.TableType; @@ -41,9 +48,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.HiveOperation; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.security.UserGroupInformation; -import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,29 +60,28 @@ import java.text.ParseException; import java.util.*; import static org.apache.atlas.AtlasClient.NAME; -import static org.apache.atlas.hive.hook.HiveHook.IO_SEP; -import static org.apache.atlas.hive.hook.HiveHook.SEP; -import static org.apache.atlas.hive.hook.HiveHook.entityComparator; -import static org.apache.atlas.hive.hook.HiveHook.getProcessQualifiedName; -import static org.apache.atlas.hive.hook.HiveHook.lower; +import static org.apache.atlas.hive.hook.events.BaseHiveEvent.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; public class HiveHookIT extends HiveITBase { private static final Logger LOG = LoggerFactory.getLogger(HiveHookIT.class); - private static final String PART_FILE = "2015-01-01"; + private static final String PART_FILE = "2015-01-01"; @Test public void testCreateDatabase() throws Exception { String dbName = "db" + random(); + runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1', 'p2'='v2')"); - String dbId = assertDatabaseIsRegistered(dbName); - Referenceable definition = atlasClient.getEntity(dbId); - Map params = (Map) definition.get(HiveMetaStoreBridge.PARAMETERS); + String dbId = assertDatabaseIsRegistered(dbName); + AtlasEntity dbEntity = atlasClientV2.getEntityByGuid(dbId).getEntity(); + Map params = (Map) dbEntity.getAttribute(ATTRIBUTE_PARAMETERS); + Assert.assertNotNull(params); Assert.assertEquals(params.size(), 2); Assert.assertEquals(params.get("p1"), "v1"); @@ -87,110 +91,82 @@ public class HiveHookIT extends HiveITBase { assertDBIsNotRegistered(dbName); runCommand("create database " + dbName); - String dbid = assertDatabaseIsRegistered(dbName); + dbId = assertDatabaseIsRegistered(dbName); //assert on qualified name - Referenceable dbEntity = atlasClient.getEntity(dbid); - Assert.assertEquals(dbEntity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), dbName.toLowerCase() + "@" + CLUSTER_NAME); - - } - - private String dbName() { - return "db" + random(); - } - - private String createDatabase() throws Exception { - String dbName = dbName(); - runCommand("create database " + dbName); - return dbName; - } + dbEntity = atlasClientV2.getEntityByGuid(dbId).getEntity(); - private String columnName() { - return "col" + random(); - } - - private String createTable() throws Exception { - return createTable(false); + Assert.assertEquals(dbEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) , dbName.toLowerCase() + "@" + CLUSTER_NAME); } - private String createTable(boolean isPartitioned) throws Exception { + @Test + public void testCreateTable() throws Exception { String tableName = tableName(); - runCommand("create table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ? - " partitioned by(dt string)" : "")); - return tableName; - } + String dbName = createDatabase(); + String colName = columnName(); - private String createTable(boolean isExternal, boolean isPartitioned, boolean isTemporary) throws Exception { - String tableName = tableName(); + runCommand("create table " + dbName + "." + tableName + "(" + colName + " int, name string)"); - String location = ""; - if (isExternal) { - location = " location '" + createTestDFSPath("someTestPath") + "'"; - } - runCommand("create " + (isExternal ? " EXTERNAL " : "") + (isTemporary ? "TEMPORARY " : "") + "table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ? - " partitioned by(dt string)" : "") + location); + String tableId = assertTableIsRegistered(dbName, tableName); + String colId = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName), colName)); //there is only one instance of column registered + AtlasEntity colEntity = atlasClientV2.getEntityByGuid(colId).getEntity(); - return tableName; - } + Assert.assertEquals(colEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), String.format("%s.%s.%s@%s", dbName.toLowerCase(), tableName.toLowerCase(), colName.toLowerCase(), CLUSTER_NAME)); + Assert.assertNotNull(colEntity.getAttribute(ATTRIBUTE_TABLE)); - @Test - public void testCreateTable() throws Exception { - String tableName = tableName(); - String dbName = createDatabase(); - String colName = columnName(); - runCommand("create table " + dbName + "." + tableName + "(" + colName + " int, name string)"); - String tableId = assertTableIsRegistered(dbName, tableName); + AtlasObjectId tblObjId = toAtlasObjectId(colEntity.getAttribute(ATTRIBUTE_TABLE)); - //there is only one instance of column registered - String colId = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName( - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName), colName)); - Referenceable colEntity = atlasClient.getEntity(colId); - Assert.assertEquals(colEntity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), String.format("%s.%s.%s@%s", dbName.toLowerCase(), - tableName.toLowerCase(), colName.toLowerCase(), CLUSTER_NAME)); - Assert.assertNotNull(colEntity.get(HiveMetaStoreBridge.TABLE)); - Assert.assertEquals(((Id) colEntity.get(HiveMetaStoreBridge.TABLE))._getId(), tableId); + Assert.assertEquals(tblObjId.getGuid(), tableId); //assert that column.owner = table.owner - Referenceable tableRef = atlasClient.getEntity(tableId); - assertEquals(tableRef.get(AtlasClient.OWNER), colEntity.get(AtlasClient.OWNER)); + AtlasEntity tblEntity1 = atlasClientV2.getEntityByGuid(tableId).getEntity(); + AtlasEntity colEntity1 = atlasClientV2.getEntityByGuid(colId).getEntity(); + + assertEquals(tblEntity1.getAttribute(ATTRIBUTE_OWNER), colEntity1.getAttribute(ATTRIBUTE_OWNER)); //create table where db is not registered tableName = createTable(); - tableId = assertTableIsRegistered(DEFAULT_DB, tableName); - tableRef = atlasClient.getEntity(tableId); - Assert.assertEquals(tableRef.get(HiveMetaStoreBridge.TABLE_TYPE_ATTR), TableType.MANAGED_TABLE.name()); - Assert.assertEquals(tableRef.get(HiveMetaStoreBridge.COMMENT), "table comment"); + tableId = assertTableIsRegistered(DEFAULT_DB, tableName); + + AtlasEntity tblEntity2 = atlasClientV2.getEntityByGuid(tableId).getEntity(); + + Assert.assertEquals(tblEntity2.getAttribute(ATTRIBUTE_TABLE_TYPE), TableType.MANAGED_TABLE.name()); + Assert.assertEquals(tblEntity2.getAttribute(ATTRIBUTE_COMMENT), "table comment"); + String entityName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName); - Assert.assertEquals(tableRef.get(AtlasClient.NAME), tableName.toLowerCase()); - Assert.assertEquals(tableRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), entityName); - Table t = hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, tableName); - long createTime = Long.parseLong(t.getMetadata().getProperty(hive_metastoreConstants.DDL_TIME)) * HiveMetaStoreBridge.MILLIS_CONVERT_FACTOR; + Assert.assertEquals(tblEntity2.getAttribute(AtlasClient.NAME), tableName.toLowerCase()); + Assert.assertEquals(tblEntity2.getAttribute(ATTRIBUTE_QUALIFIED_NAME), entityName); + + Table t = hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, tableName); + long createTime = Long.parseLong(t.getMetadata().getProperty(hive_metastoreConstants.DDL_TIME)) * MILLIS_CONVERT_FACTOR; + + verifyTimestamps(tblEntity2, ATTRIBUTE_CREATE_TIME, createTime); + verifyTimestamps(tblEntity2, ATTRIBUTE_LAST_ACCESS_TIME, createTime); - verifyTimestamps(tableRef, HiveMetaStoreBridge.CREATE_TIME, createTime); - verifyTimestamps(tableRef, HiveMetaStoreBridge.LAST_ACCESS_TIME, createTime); + final AtlasObjectId sdEntity = toAtlasObjectId(tblEntity2.getAttribute(ATTRIBUTE_STORAGEDESC)); - final Referenceable sdRef = (Referenceable) tableRef.get(HiveMetaStoreBridge.STORAGE_DESC); - Assert.assertEquals(sdRef.get(HiveMetaStoreBridge.STORAGE_IS_STORED_AS_SUB_DIRS), false); - Assert.assertNotNull(sdRef.get(HiveMetaStoreBridge.TABLE)); - Assert.assertEquals(((Id) sdRef.get(HiveMetaStoreBridge.TABLE))._getId(), tableId); + Assert.assertNotNull(sdEntity); + + // Assert.assertEquals(((Id) sdRef.getAttribute(HiveMetaStoreBridge.TABLE))._getId(), tableId); //Create table where database doesn't exist, will create database instance as well assertDatabaseIsRegistered(DEFAULT_DB); } - private void verifyTimestamps(Referenceable ref, String property, long expectedTime) throws ParseException { + + private void verifyTimestamps(AtlasEntity ref, String property, long expectedTime) throws ParseException { //Verify timestamps. - String createTimeStr = (String) ref.get(property); - Date createDate = TypeSystem.getInstance().getDateFormat().parse(createTimeStr); - Assert.assertNotNull(createTimeStr); + Object createTime = ref.getAttribute(property); + + Assert.assertNotNull(createTime); if (expectedTime > 0) { - Assert.assertEquals(expectedTime, createDate.getTime()); + Assert.assertEquals(expectedTime, createTime); } } - private void verifyTimestamps(Referenceable ref, String property) throws ParseException { + private void verifyTimestamps(AtlasEntity ref, String property) throws ParseException { verifyTimestamps(ref, property, 0); } @@ -198,28 +174,30 @@ public class HiveHookIT extends HiveITBase { @Test(enabled = false) public void testCreateExternalTable() throws Exception { String tableName = tableName(); - String colName = columnName(); + String colName = columnName(); + String pFile = createTestDFSPath("parentPath"); + String query = String.format("create EXTERNAL table %s.%s(%s, %s) location '%s'", DEFAULT_DB , tableName , colName + " int", "name string", pFile); - String pFile = createTestDFSPath("parentPath"); - final String query = String.format("create EXTERNAL table %s.%s( %s, %s) location '%s'", DEFAULT_DB , tableName , colName + " int", "name string", pFile); runCommand(query); + assertTableIsRegistered(DEFAULT_DB, tableName, null, true); - String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), - AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - getTableProcessQualifiedName(DEFAULT_DB, tableName), null); - Referenceable processReference = atlasClient.getEntity(processId); - assertEquals(processReference.get("userName"), UserGroupInformation.getCurrentUser().getShortUserName()); - verifyTimestamps(processReference, "startTime"); - verifyTimestamps(processReference, "endTime"); + String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), ATTRIBUTE_QUALIFIED_NAME, getTableProcessQualifiedName(DEFAULT_DB, tableName), null); + + AtlasEntity processsEntity = atlasClientV2.getEntityByGuid(processId).getEntity(); - validateHDFSPaths(processReference, INPUTS, pFile); + assertEquals(processsEntity.getAttribute("userName"), UserGroupInformation.getCurrentUser().getShortUserName()); + + verifyTimestamps(processsEntity, "startTime"); + verifyTimestamps(processsEntity, "endTime"); + + validateHDFSPaths(processsEntity, INPUTS, pFile); } private Set<ReadEntity> getInputs(String inputName, Entity.Type entityType) throws HiveException { final ReadEntity entity = new ReadEntity(); - if ( Entity.Type.DFS_DIR.equals(entityType)) { + if (Entity.Type.DFS_DIR.equals(entityType)) { entity.setName(lower(new Path(inputName).toString())); entity.setTyp(Entity.Type.DFS_DIR); } else { @@ -228,7 +206,7 @@ public class HiveHookIT extends HiveITBase { } if (entityType == Entity.Type.TABLE) { - entity.setT(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, inputName)); + entity.setT(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, inputName)); } return new LinkedHashSet<ReadEntity>() {{ add(entity); }}; @@ -237,7 +215,7 @@ public class HiveHookIT extends HiveITBase { private Set<WriteEntity> getOutputs(String inputName, Entity.Type entityType) throws HiveException { final WriteEntity entity = new WriteEntity(); - if ( Entity.Type.DFS_DIR.equals(entityType) || Entity.Type.LOCAL_DIR.equals(entityType)) { + if (Entity.Type.DFS_DIR.equals(entityType) || Entity.Type.LOCAL_DIR.equals(entityType)) { entity.setName(lower(new Path(inputName).toString())); entity.setTyp(entityType); } else { @@ -246,30 +224,55 @@ public class HiveHookIT extends HiveITBase { } if (entityType == Entity.Type.TABLE) { - entity.setT(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, inputName)); + entity.setT(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, inputName)); } + return new LinkedHashSet<WriteEntity>() {{ add(entity); }}; } - private void validateOutputTables(Referenceable processReference, Set<WriteEntity> expectedTables) throws Exception { - validateTables(processReference, OUTPUTS, expectedTables); + private void validateOutputTables(AtlasEntity processEntity, Set<WriteEntity> expectedTables) throws Exception { + validateTables(toAtlasObjectIdList(processEntity.getAttribute(ATTRIBUTE_OUTPUTS)), expectedTables); } - private void validateInputTables(Referenceable processReference, Set<ReadEntity> expectedTables) throws Exception { - validateTables(processReference, INPUTS, expectedTables); + private void validateInputTables(AtlasEntity processEntity, Set<ReadEntity> expectedTables) throws Exception { + validateTables(toAtlasObjectIdList(processEntity.getAttribute(ATTRIBUTE_INPUTS)), expectedTables); } - private void validateTables(Referenceable processReference, String attrName, Set<? extends Entity> expectedTables) throws Exception { - List<Id> tableRef = (List<Id>) processReference.get(attrName); + private void validateTables(List<AtlasObjectId> tableIds, Set<? extends Entity> expectedTables) throws Exception { + if (tableIds == null) { + Assert.assertTrue(CollectionUtils.isEmpty(expectedTables)); + } else if (expectedTables == null) { + Assert.assertTrue(CollectionUtils.isEmpty(tableIds)); + } else { + Assert.assertEquals(tableIds.size(), expectedTables.size()); - Iterator<? extends Entity> iterator = expectedTables.iterator(); - for(int i = 0; i < expectedTables.size(); i++) { - Entity hiveEntity = iterator.next(); - if (Entity.Type.TABLE.equals(hiveEntity.getType()) || - Entity.Type.DFS_DIR.equals(hiveEntity.getType())) { - Referenceable entity = atlasClient.getEntity(tableRef.get(i)._getId()); - LOG.debug("Validating output {} {} ", i, entity); - Assert.assertEquals(entity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), hiveEntity.getName()); + List<String> entityQualifiedNames = new ArrayList<>(tableIds.size()); + List<String> expectedTableNames = new ArrayList<>(expectedTables.size()); + + for (AtlasObjectId tableId : tableIds) { + AtlasEntity atlasEntity = atlasClientV2.getEntityByGuid(tableId.getGuid()).getEntity(); + + entityQualifiedNames.add((String) atlasEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } + + for (Iterator<? extends Entity> iterator = expectedTables.iterator(); iterator.hasNext(); ) { + Entity hiveEntity = iterator.next(); + + expectedTableNames.add(hiveEntity.getName()); + } + + for (String entityQualifiedName : entityQualifiedNames) { + boolean found = false; + + for (String expectedTableName : expectedTableNames) { + if (entityQualifiedName.startsWith(expectedTableName)) { + found = true; + + break; + } + } + + assertTrue(found, "Table name '" + entityQualifiedName + "' does not start with any name in the expected list " + expectedTableNames); } } } @@ -280,27 +283,28 @@ public class HiveHookIT extends HiveITBase { private String assertColumnIsRegistered(String colName, AssertPredicate assertPredicate) throws Exception { LOG.debug("Searching for column {}", colName); - return assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - colName, assertPredicate); + + return assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN.getName(), ATTRIBUTE_QUALIFIED_NAME, colName, assertPredicate); } private String assertSDIsRegistered(String sdQFName, AssertPredicate assertPredicate) throws Exception { LOG.debug("Searching for sd {}", sdQFName.toLowerCase()); - return assertEntityIsRegistered(HiveDataTypes.HIVE_STORAGEDESC.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - sdQFName.toLowerCase(), assertPredicate); + + return assertEntityIsRegistered(HiveDataTypes.HIVE_STORAGEDESC.getName(), ATTRIBUTE_QUALIFIED_NAME, sdQFName.toLowerCase(), assertPredicate); } private void assertColumnIsNotRegistered(String colName) throws Exception { LOG.debug("Searching for column {}", colName); - assertEntityIsNotRegistered(HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - colName); + + assertEntityIsNotRegistered(HiveDataTypes.HIVE_COLUMN.getName(), ATTRIBUTE_QUALIFIED_NAME, colName); } @Test public void testCTAS() throws Exception { - String tableName = createTable(); + String tableName = createTable(); String ctasTableName = "table" + random(); - String query = "create table " + ctasTableName + " as select * from " + tableName; + String query = "create table " + ctasTableName + " as select * from " + tableName; + runCommand(query); final Set<ReadEntity> readEntities = getInputs(tableName, Entity.Type.TABLE); @@ -308,61 +312,71 @@ public class HiveHookIT extends HiveITBase { assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, readEntities, writeEntities)); assertTableIsRegistered(DEFAULT_DB, ctasTableName); - } - private HiveHook.HiveEventContext constructEvent(String query, HiveOperation op, Set<ReadEntity> inputs, Set<WriteEntity> outputs) { - HiveHook.HiveEventContext event = new HiveHook.HiveEventContext(); + private HiveEventContext constructEvent(String query, HiveOperation op, Set<ReadEntity> inputs, Set<WriteEntity> outputs) { + HiveEventContext event = new HiveEventContext(); + event.setQueryStr(query); event.setOperation(op); event.setInputs(inputs); event.setOutputs(outputs); + return event; } @Test public void testEmptyStringAsValue() throws Exception{ String tableName = tableName(); - String command = "create table " + tableName + "(id int, name string) row format delimited lines terminated by '\n' null defined as ''"; + String command = "create table " + tableName + "(id int, name string) row format delimited lines terminated by '\n' null defined as ''"; + runCommand(command); + assertTableIsRegistered(DEFAULT_DB, tableName); } @Test public void testDropAndRecreateCTASOutput() throws Exception { - String tableName = createTable(); + String tableName = createTable(); String ctasTableName = "table" + random(); - String query = "create table " + ctasTableName + " as select * from " + tableName; + String query = "create table " + ctasTableName + " as select * from " + tableName; + runCommand(query); assertTableIsRegistered(DEFAULT_DB, ctasTableName); - Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); + Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); Set<WriteEntity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE); - final HiveHook.HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs); - String processId = assertProcessIsRegistered(hiveEventContext); + HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs); + String processId = assertProcessIsRegistered(hiveEventContext); + String drpquery = String.format("drop table %s ", ctasTableName); - final String drpquery = String.format("drop table %s ", ctasTableName); runCommandWithDelay(drpquery, 100); + assertTableIsNotRegistered(DEFAULT_DB, ctasTableName); runCommand(query); + assertTableIsRegistered(DEFAULT_DB, ctasTableName); + outputs = getOutputs(ctasTableName, Entity.Type.TABLE); + String process2Id = assertProcessIsRegistered(hiveEventContext, inputs, outputs); assertNotEquals(process2Id, processId); - Referenceable processRef = atlasClient.getEntity(processId); - validateOutputTables(processRef, outputs); + AtlasEntity processsEntity = atlasClientV2.getEntityByGuid(processId).getEntity(); + + validateOutputTables(processsEntity, outputs); } @Test public void testCreateView() throws Exception { String tableName = createTable(); - String viewName = tableName(); - String query = "create view " + viewName + " as select * from " + tableName; + String viewName = tableName(); + String query = "create view " + viewName + " as select * from " + tableName; + runCommand(query); assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(tableName, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE))); @@ -373,46 +387,59 @@ public class HiveHookIT extends HiveITBase { public void testAlterViewAsSelect() throws Exception { //Create the view from table1 String table1Name = createTable(); - String viewName = tableName(); - String query = "create view " + viewName + " as select * from " + table1Name; + String viewName = tableName(); + String query = "create view " + viewName + " as select * from " + table1Name; + runCommand(query); String table1Id = assertTableIsRegistered(DEFAULT_DB, table1Name); + assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(table1Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE))); + String viewId = assertTableIsRegistered(DEFAULT_DB, viewName); //Check lineage which includes table1 - String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName); - JSONObject response = atlasClient.getInputGraph(datasetName); - JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices"); - assertTrue(vertices.has(viewId)); - assertTrue(vertices.has(table1Id)); + String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName); + String tableId = assertTableIsRegistered(DEFAULT_DB, viewName); + AtlasLineageInfo inputLineageInfo = atlasClientV2.getLineageInfo(tableId, AtlasLineageInfo.LineageDirection.INPUT, 0); + Map<String, AtlasEntityHeader> entityMap = inputLineageInfo.getGuidEntityMap(); + + assertTrue(entityMap.containsKey(viewId)); + assertTrue(entityMap.containsKey(table1Id)); //Alter the view from table2 String table2Name = createTable(); + query = "alter view " + viewName + " as select * from " + table2Name; + runCommand(query); //Check if alter view process is reqistered assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(table2Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE))); + String table2Id = assertTableIsRegistered(DEFAULT_DB, table2Name); + Assert.assertEquals(assertTableIsRegistered(DEFAULT_DB, viewName), viewId); datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName); - response = atlasClient.getInputGraph(datasetName); - vertices = response.getJSONObject("values").getJSONObject("vertices"); - assertTrue(vertices.has(viewId)); + + String tableId1 = assertTableIsRegistered(DEFAULT_DB, viewName); + AtlasLineageInfo inputLineageInfo1 = atlasClientV2.getLineageInfo(tableId1, AtlasLineageInfo.LineageDirection.INPUT, 0); + Map<String, AtlasEntityHeader> entityMap1 = inputLineageInfo1.getGuidEntityMap(); + + assertTrue(entityMap1.containsKey(viewId)); //This is through the alter view process - assertTrue(vertices.has(table2Id)); + assertTrue(entityMap1.containsKey(table2Id)); //This is through the Create view process - assertTrue(vertices.has(table1Id)); + assertTrue(entityMap1.containsKey(table1Id)); //Outputs dont exist - response = atlasClient.getOutputGraph(datasetName); - vertices = response.getJSONObject("values").getJSONObject("vertices"); - Assert.assertEquals(vertices.length(), 0); + AtlasLineageInfo outputLineageInfo = atlasClientV2.getLineageInfo(tableId1, AtlasLineageInfo.LineageDirection.OUTPUT, 0); + Map<String, AtlasEntityHeader> entityMap2 = outputLineageInfo.getGuidEntityMap(); + + assertEquals(entityMap2.size(),0); } private String createTestDFSFile(String path) throws Exception { @@ -422,9 +449,9 @@ public class HiveHookIT extends HiveITBase { @Test public void testLoadLocalPath() throws Exception { String tableName = createTable(false); + String loadFile = file("load"); + String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName; - String loadFile = file("load"); - String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName; runCommand(query); assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE))); @@ -433,9 +460,9 @@ public class HiveHookIT extends HiveITBase { @Test public void testLoadLocalPathIntoPartition() throws Exception { String tableName = createTable(true); + String loadFile = file("load"); + String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')"; - String loadFile = file("load"); - String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')"; runCommand(query); assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE))); @@ -447,31 +474,35 @@ public class HiveHookIT extends HiveITBase { assertTableIsRegistered(DEFAULT_DB, tableName); - final String loadFile = createTestDFSFile("loadDFSFile"); - String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')"; + String loadFile = createTestDFSFile("loadDFSFile"); + String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')"; + runCommand(query); - final Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE); - final Set<ReadEntity> inputs = getInputs(loadFile, Entity.Type.DFS_DIR); + Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE); + Set<ReadEntity> inputs = getInputs(loadFile, Entity.Type.DFS_DIR); + Set<WriteEntity> partitionOps = new LinkedHashSet<>(outputs); - final Set<WriteEntity> partitionOps = new LinkedHashSet<>(outputs); partitionOps.addAll(getOutputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION)); - Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.LOAD, inputs, partitionOps), inputs, outputs); + AtlasEntity processReference = validateProcess(constructEvent(query, HiveOperation.LOAD, inputs, partitionOps), inputs, outputs); + validateHDFSPaths(processReference, INPUTS, loadFile); validateOutputTables(processReference, outputs); - final String loadFile2 = createTestDFSFile("loadDFSFile1"); + String loadFile2 = createTestDFSFile("loadDFSFile1"); + query = "load data inpath '" + loadFile2 + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')"; + runCommand(query); Set<ReadEntity> process2Inputs = getInputs(loadFile2, Entity.Type.DFS_DIR); Set<ReadEntity> expectedInputs = new LinkedHashSet<>(); + expectedInputs.addAll(process2Inputs); expectedInputs.addAll(inputs); validateProcess(constructEvent(query, HiveOperation.LOAD, expectedInputs, partitionOps), expectedInputs, outputs); - } private String getQualifiedTblName(String inputTable) { @@ -483,28 +514,33 @@ public class HiveHookIT extends HiveITBase { return inputtblQlfdName; } - private Referenceable validateProcess(HiveHook.HiveEventContext event, Set<ReadEntity> inputTables, Set<WriteEntity> outputTables) throws Exception { - String processId = assertProcessIsRegistered(event, inputTables, outputTables); - Referenceable process = atlasClient.getEntity(processId); + private AtlasEntity validateProcess(HiveEventContext event, Set<ReadEntity> inputTables, Set<WriteEntity> outputTables) throws Exception { + String processId = assertProcessIsRegistered(event, inputTables, outputTables); + AtlasEntity processEntity = atlasClientV2.getEntityByGuid(processId).getEntity(); + List<AtlasObjectId> inputs = toAtlasObjectIdList(processEntity.getAttribute(INPUTS)); + List<AtlasObjectId> outputs = toAtlasObjectIdList(processEntity.getAttribute(OUTPUTS)); + if (inputTables == null) { - Assert.assertNull(process.get(INPUTS)); + Assert.assertTrue(CollectionUtils.isEmpty(inputs)); } else { - Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), inputTables.size()); - validateInputTables(process, inputTables); + Assert.assertEquals(inputs.size(), inputTables.size()); + + validateInputTables(processEntity, inputTables); } if (outputTables == null) { - Assert.assertNull(process.get(OUTPUTS)); + Assert.assertTrue(CollectionUtils.isEmpty(outputs)); } else { - Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), outputTables.size()); - validateOutputTables(process, outputTables); + Assert.assertEquals(outputs.size(), outputTables.size()); + + validateOutputTables(processEntity, outputTables); } - return process; + return processEntity; } - private Referenceable validateProcess(HiveHook.HiveEventContext event) throws Exception { - return validateProcess(event, event.getInputs(), event.getOutputs()); + private AtlasEntity validateProcess(HiveEventContext event) throws Exception { + return validateProcess(event, event.getInputs(), event.getOutputs()); } @Test @@ -512,62 +548,68 @@ public class HiveHookIT extends HiveITBase { String inputTable1Name = createTable(); String inputTable2Name = createTable(); String insertTableName = createTable(); + assertTableIsRegistered(DEFAULT_DB, inputTable1Name); assertTableIsRegistered(DEFAULT_DB, insertTableName); String query = "insert into " + insertTableName + " select t1.id, t1.name from " + inputTable2Name + " as t2, " + inputTable1Name + " as t1 where t1.id=t2.id"; runCommand(query); - final Set<ReadEntity> inputs = getInputs(inputTable1Name, Entity.Type.TABLE); + + Set<ReadEntity> inputs = getInputs(inputTable1Name, Entity.Type.TABLE); + inputs.addAll(getInputs(inputTable2Name, Entity.Type.TABLE)); Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE); + (outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT); - HiveHook.HiveEventContext event = constructEvent(query, HiveOperation.QUERY, inputs, outputs); + HiveEventContext event = constructEvent(query, HiveOperation.QUERY, inputs, outputs); Set<ReadEntity> expectedInputs = new TreeSet<ReadEntity>(entityComparator) {{ addAll(inputs); }}; + assertTableIsRegistered(DEFAULT_DB, insertTableName); - Referenceable processRef1 = validateProcess(event, expectedInputs, outputs); + + AtlasEntity processEntity1 = validateProcess(event, expectedInputs, outputs); //Test sorting of tbl names SortedSet<String> sortedTblNames = new TreeSet<>(); + sortedTblNames.add(inputTable1Name.toLowerCase()); sortedTblNames.add(inputTable2Name.toLowerCase()); //Verify sorted order of inputs in qualified name - Assert.assertEquals( - processRef1.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), - - Joiner.on(SEP).join("QUERY", - getQualifiedTblName(sortedTblNames.first()), - HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, sortedTblNames.first())).getTime(), - getQualifiedTblName(sortedTblNames.last()), - HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, sortedTblNames.last())).getTime()) - + IO_SEP + SEP - + Joiner.on(SEP). - join(WriteEntity.WriteType.INSERT.name(), - getQualifiedTblName(insertTableName), - HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, insertTableName)).getTime()) - ); + Assert.assertEquals(processEntity1.getAttribute(ATTRIBUTE_QUALIFIED_NAME), + Joiner.on(SEP).join("QUERY", + getQualifiedTblName(sortedTblNames.first()), + HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, sortedTblNames.first())), + getQualifiedTblName(sortedTblNames.last()), + HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, sortedTblNames.last()))) + + IO_SEP + SEP + + Joiner.on(SEP). + join(WriteEntity.WriteType.INSERT.name(), + getQualifiedTblName(insertTableName), + HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.getHiveClient().getTable(DEFAULT_DB, insertTableName))) + ); //Rerun same query. Should result in same process runCommandWithDelay(query, 1000); - Referenceable processRef2 = validateProcess(event, expectedInputs, outputs); - Assert.assertEquals(processRef1.getId()._getId(), processRef2.getId()._getId()); + AtlasEntity processEntity2 = validateProcess(event, expectedInputs, outputs); + + Assert.assertEquals(processEntity1.getGuid(), processEntity2.getGuid()); } @Test public void testInsertIntoLocalDir() throws Exception { - String tableName = createTable(); - File randomLocalPath = File.createTempFile("hiverandom", ".tmp"); - String query = - "insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName; + String tableName = createTable(); + File randomLocalPath = File.createTempFile("hiverandom", ".tmp"); + String query = "insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName; runCommand(query); + validateProcess(constructEvent(query, HiveOperation.QUERY, getInputs(tableName, Entity.Type.TABLE), null)); assertTableIsRegistered(DEFAULT_DB, tableName); @@ -576,106 +618,117 @@ public class HiveHookIT extends HiveITBase { @Test public void testUpdateProcess() throws Exception { String tableName = createTable(); - String pFile1 = createTestDFSPath("somedfspath1"); - String query = - "insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName; + String pFile1 = createTestDFSPath("somedfspath1"); + String query = "insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName; runCommand(query); - Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); - final Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR); + Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); + Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR); + outputs.iterator().next().setWriteType(WriteEntity.WriteType.PATH_WRITE); - final HiveHook.HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, outputs); - Referenceable processReference = validateProcess(hiveEventContext); - validateHDFSPaths(processReference, OUTPUTS, pFile1); + HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, outputs); + AtlasEntity processEntity = validateProcess(hiveEventContext); + + validateHDFSPaths(processEntity, OUTPUTS, pFile1); assertTableIsRegistered(DEFAULT_DB, tableName); - validateInputTables(processReference, inputs); + + validateInputTables(processEntity, inputs); //Rerun same query with same HDFS path runCommandWithDelay(query, 1000); + assertTableIsRegistered(DEFAULT_DB, tableName); - Referenceable process2Reference = validateProcess(hiveEventContext); - validateHDFSPaths(process2Reference, OUTPUTS, pFile1); - Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId()); + AtlasEntity process2Entity = validateProcess(hiveEventContext); + + validateHDFSPaths(process2Entity, OUTPUTS, pFile1); + + Assert.assertEquals(process2Entity.getGuid(), processEntity.getGuid()); //Rerun same query with a new HDFS path. Will result in same process since HDFS paths is not part of qualified name for QUERY operations - final String pFile2 = createTestDFSPath("somedfspath2"); + String pFile2 = createTestDFSPath("somedfspath2"); + query = "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName; + runCommandWithDelay(query, 1000); + assertTableIsRegistered(DEFAULT_DB, tableName); + Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{ addAll(getOutputs(pFile2, Entity.Type.DFS_DIR)); addAll(outputs); }}; - Referenceable process3Reference = validateProcess(constructEvent(query, HiveOperation.QUERY, inputs, p3Outputs)); - validateHDFSPaths(process3Reference, OUTPUTS, pFile2); + AtlasEntity process3Entity = validateProcess(constructEvent(query, HiveOperation.QUERY, inputs, p3Outputs)); - Assert.assertEquals(process3Reference.getId()._getId(), processReference.getId()._getId()); + validateHDFSPaths(process3Entity, OUTPUTS, pFile2); + + Assert.assertEquals(process3Entity.getGuid(), processEntity.getGuid()); } @Test public void testInsertIntoDFSDirPartitioned() throws Exception { - //Test with partitioned table String tableName = createTable(true); - String pFile1 = createTestDFSPath("somedfspath1"); - String query = - "insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'"; + String pFile1 = createTestDFSPath("somedfspath1"); + String query = "insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'"; runCommand(query); Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); - final Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR); + Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR); + outputs.iterator().next().setWriteType(WriteEntity.WriteType.PATH_WRITE); - final Set<ReadEntity> partitionIps = new LinkedHashSet<>(inputs); + Set<ReadEntity> partitionIps = new LinkedHashSet<>(inputs); + partitionIps.addAll(getInputs(DEFAULT_DB + "@" + tableName + "@dt='" + PART_FILE + "'", Entity.Type.PARTITION)); - Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, outputs), inputs, outputs); + AtlasEntity processEntity = validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, outputs), inputs, outputs); //Rerun same query with different HDFS path. Should not create another process and should update it. - final String pFile2 = createTestDFSPath("somedfspath2"); - query = - "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'"; + String pFile2 = createTestDFSPath("somedfspath2"); + query = "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'"; runCommand(query); - final Set<WriteEntity> pFile2Outputs = getOutputs(pFile2, Entity.Type.DFS_DIR); + Set<WriteEntity> pFile2Outputs = getOutputs(pFile2, Entity.Type.DFS_DIR); + pFile2Outputs.iterator().next().setWriteType(WriteEntity.WriteType.PATH_WRITE); + //Now the process has 2 paths - one older with deleted reference to partition and another with the the latest partition Set<WriteEntity> p2Outputs = new LinkedHashSet<WriteEntity>() {{ addAll(pFile2Outputs); addAll(outputs); }}; - Referenceable process2Reference = validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, pFile2Outputs), inputs, p2Outputs); - validateHDFSPaths(process2Reference, OUTPUTS, pFile2); + AtlasEntity process2Entity = validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, pFile2Outputs), inputs, p2Outputs); - Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId()); - } + validateHDFSPaths(process2Entity, OUTPUTS, pFile2); + Assert.assertEquals(process2Entity.getGuid(), processEntity.getGuid()); + } //Disabling test as temporary table is not captured by hiveHook(https://issues.apache.org/jira/browse/ATLAS-1274) @Test(enabled = false) public void testInsertIntoTempTable() throws Exception { - String tableName = createTable(); + String tableName = createTable(); String insertTableName = createTable(false, false, true); + assertTableIsRegistered(DEFAULT_DB, tableName); assertTableIsNotRegistered(DEFAULT_DB, insertTableName, true); - String query = - "insert into " + insertTableName + " select id, name from " + tableName; + String query = "insert into " + insertTableName + " select id, name from " + tableName; runCommand(query); Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE); - outputs.iterator().next().setName(getQualifiedTblName(insertTableName + HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId())); + outputs.iterator().next().setWriteType(WriteEntity.WriteType.INSERT); validateProcess(constructEvent(query, HiveOperation.QUERY, inputs, outputs)); @@ -686,31 +739,29 @@ public class HiveHookIT extends HiveITBase { @Test public void testInsertIntoPartition() throws Exception { - final boolean isPartitionedTable = true; - String tableName = createTable(isPartitionedTable); - String insertTableName = createTable(isPartitionedTable); - String query = - "insert into " + insertTableName + " partition(dt = '"+ PART_FILE + "') select id, name from " + tableName - + " where dt = '"+ PART_FILE + "'"; + boolean isPartitionedTable = true; + String tableName = createTable(isPartitionedTable); + String insertTableName = createTable(isPartitionedTable); + String query = "insert into " + insertTableName + " partition(dt = '"+ PART_FILE + "') select id, name from " + tableName + " where dt = '"+ PART_FILE + "'"; + runCommand(query); - final Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); - final Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE); + Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); + Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE); + outputs.iterator().next().setWriteType(WriteEntity.WriteType.INSERT); - final Set<ReadEntity> partitionIps = new LinkedHashSet<ReadEntity>() { + Set<ReadEntity> partitionIps = new LinkedHashSet<ReadEntity>() { { addAll(inputs); add(getPartitionInput()); - } }; - final Set<WriteEntity> partitionOps = new LinkedHashSet<WriteEntity>() { + Set<WriteEntity> partitionOps = new LinkedHashSet<WriteEntity>() { { addAll(outputs); add(getPartitionOutput()); - } }; @@ -722,13 +773,6 @@ public class HiveHookIT extends HiveITBase { //TODO -Add update test case } - private String file(String tag) throws Exception { - String filename = "./target/" + tag + "-data-" + random(); - File file = new File(filename); - file.createNewFile(); - return file.getAbsolutePath(); - } - @Test public void testExportImportUnPartitionedTable() throws Exception { String tableName = createTable(false); @@ -736,89 +780,104 @@ public class HiveHookIT extends HiveITBase { assertTableIsRegistered(DEFAULT_DB, tableName); String filename = "pfile://" + mkdir("exportUnPartitioned"); - String query = "export table " + tableName + " to \"" + filename + "\""; - runCommand(query); + String query = "export table " + tableName + " to \"" + filename + "\""; - Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); - Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR); + runCommand(query); - Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.EXPORT, inputs, outputs)); + Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); + Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR); + AtlasEntity processEntity = validateProcess(constructEvent(query, HiveOperation.EXPORT, inputs, outputs)); - validateHDFSPaths(processReference, OUTPUTS, filename); - validateInputTables(processReference, inputs); + validateHDFSPaths(processEntity, OUTPUTS, filename); + validateInputTables(processEntity, inputs); //Import String importTableName = createTable(false); + assertTableIsRegistered(DEFAULT_DB, importTableName); query = "import table " + importTableName + " from '" + filename + "'"; + runCommand(query); + outputs = getOutputs(importTableName, Entity.Type.TABLE); + validateProcess(constructEvent(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs)); //Should create another process filename = "pfile://" + mkdir("export2UnPartitioned"); - query = "export table " + tableName + " to \"" + filename + "\""; + query = "export table " + tableName + " to \"" + filename + "\""; + runCommand(query); - inputs = getInputs(tableName, Entity.Type.TABLE); + inputs = getInputs(tableName, Entity.Type.TABLE); outputs = getOutputs(filename, Entity.Type.DFS_DIR); validateProcess(constructEvent(query, HiveOperation.EXPORT, inputs, outputs)); //import again shouyld create another process query = "import table " + importTableName + " from '" + filename + "'"; + runCommand(query); + outputs = getOutputs(importTableName, Entity.Type.TABLE); + validateProcess(constructEvent(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs)); } @Test public void testExportImportPartitionedTable() throws Exception { boolean isPartitionedTable = true; - final String tableName = createTable(isPartitionedTable); + String tableName = createTable(isPartitionedTable); + assertTableIsRegistered(DEFAULT_DB, tableName); //Add a partition String partFile = "pfile://" + mkdir("partition"); - String query = "alter table " + tableName + " add partition (dt='"+ PART_FILE + "') location '" + partFile + "'"; + String query = "alter table " + tableName + " add partition (dt='"+ PART_FILE + "') location '" + partFile + "'"; + runCommand(query); String filename = "pfile://" + mkdir("export"); + query = "export table " + tableName + " to \"" + filename + "\""; + runCommand(query); - final Set<ReadEntity> expectedExportInputs = getInputs(tableName, Entity.Type.TABLE); - final Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR); + Set<ReadEntity> expectedExportInputs = getInputs(tableName, Entity.Type.TABLE); + Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR); + Set<ReadEntity> partitionIps = getInputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION); //Note that export has only partition as input in this case - //Note that export has only partition as input in this case - final Set<ReadEntity> partitionIps = getInputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION); partitionIps.addAll(expectedExportInputs); - Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.EXPORT, partitionIps, outputs), expectedExportInputs, outputs); - validateHDFSPaths(processReference, OUTPUTS, filename); + AtlasEntity processEntity = validateProcess(constructEvent(query, HiveOperation.EXPORT, partitionIps, outputs), expectedExportInputs, outputs); + + validateHDFSPaths(processEntity, OUTPUTS, filename); //Import String importTableName = createTable(true); + assertTableIsRegistered(DEFAULT_DB, tableName); query = "import table " + importTableName + " from '" + filename + "'"; + runCommand(query); - final Set<ReadEntity> expectedImportInputs = getInputs(filename, Entity.Type.DFS_DIR); - final Set<WriteEntity> importOutputs = getOutputs(importTableName, Entity.Type.TABLE); + Set<ReadEntity> expectedImportInputs = getInputs(filename, Entity.Type.DFS_DIR); + Set<WriteEntity> importOutputs = getOutputs(importTableName, Entity.Type.TABLE); + Set<WriteEntity> partitionOps = getOutputs(DEFAULT_DB + "@" + importTableName + "@dt=" + PART_FILE, Entity.Type.PARTITION); - final Set<WriteEntity> partitionOps = getOutputs(DEFAULT_DB + "@" + importTableName + "@dt=" + PART_FILE, Entity.Type.PARTITION); partitionOps.addAll(importOutputs); validateProcess(constructEvent(query, HiveOperation.IMPORT, expectedImportInputs , partitionOps), expectedImportInputs, importOutputs); //Export should update same process filename = "pfile://" + mkdir("export2"); - query = "export table " + tableName + " to \"" + filename + "\""; + query = "export table " + tableName + " to \"" + filename + "\""; + runCommand(query); - final Set<WriteEntity> outputs2 = getOutputs(filename, Entity.Type.DFS_DIR); + Set<WriteEntity> outputs2 = getOutputs(filename, Entity.Type.DFS_DIR); Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{ addAll(outputs2); addAll(outputs); @@ -827,14 +886,16 @@ public class HiveHookIT extends HiveITBase { validateProcess(constructEvent(query, HiveOperation.EXPORT, partitionIps, outputs2), expectedExportInputs, p3Outputs); query = "alter table " + importTableName + " drop partition (dt='"+ PART_FILE + "')"; + runCommand(query); //Import should update same process query = "import table " + importTableName + " from '" + filename + "'"; + runCommandWithDelay(query, 1000); - final Set<ReadEntity> importInputs = getInputs(filename, Entity.Type.DFS_DIR); - final Set<ReadEntity> expectedImport2Inputs = new LinkedHashSet<ReadEntity>() {{ + Set<ReadEntity> importInputs = getInputs(filename, Entity.Type.DFS_DIR); + Set<ReadEntity> expectedImport2Inputs = new LinkedHashSet<ReadEntity>() {{ addAll(importInputs); addAll(expectedImportInputs); }}; @@ -845,149 +906,163 @@ public class HiveHookIT extends HiveITBase { @Test public void testIgnoreSelect() throws Exception { String tableName = createTable(); - String query = "select * from " + tableName; + String query = "select * from " + tableName; + runCommand(query); - Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); - HiveHook.HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, null); + + Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); + HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, null); + assertProcessIsNotRegistered(hiveEventContext); //check with uppercase table name query = "SELECT * from " + tableName.toUpperCase(); + runCommand(query); + assertProcessIsNotRegistered(hiveEventContext); } @Test public void testAlterTableRenameAliasRegistered() throws Exception{ - String tableName = createTable(false); - String tableGuid = assertTableIsRegistered(DEFAULT_DB, tableName); + String tableName = createTable(false); + String tableGuid = assertTableIsRegistered(DEFAULT_DB, tableName); String newTableName = tableName(); - String query = String.format("alter table %s rename to %s", tableName, newTableName); + String query = String.format("alter table %s rename to %s", tableName, newTableName); + runCommand(query); + String newTableGuid = assertTableIsRegistered(DEFAULT_DB, newTableName); - Map<String, Object> valueMap = atlasClient.getEntity(newTableGuid).getValuesMap(); - Iterable<String> aliasList = (Iterable<String>) valueMap.get("aliases"); - String aliasTableName = aliasList.iterator().next(); + + assertEquals(tableGuid, newTableGuid); + + AtlasEntity atlasEntity = atlasClientV2.getEntityByGuid(newTableGuid).getEntity(); + Map<String, Object> valueMap = atlasEntity.getAttributes(); + Iterable<String> aliasList = (Iterable<String>) valueMap.get("aliases"); + String aliasTableName = aliasList.iterator().next(); + assert tableName.toLowerCase().equals(aliasTableName); } @Test public void testAlterTableRename() throws Exception { - String tableName = createTable(true); - final String newDBName = createDatabase(); + String tableName = createTable(true); + String newDBName = createDatabase(); + String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); + AtlasEntity tableEntity = atlasClientV2.getEntityByGuid(tableId).getEntity(); + String createTime = String.valueOf(tableEntity.getAttribute(ATTRIBUTE_CREATE_TIME)); - String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); - Referenceable tableEntity = atlasClient.getEntity(tableId); - final String createTime = (String)tableEntity.get(HiveMetaStoreBridge.CREATE_TIME); Assert.assertNotNull(createTime); String columnGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), NAME)); - String sdGuid = assertSDIsRegistered(HiveMetaStoreBridge.getStorageDescQFName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName)), null); - assertDatabaseIsRegistered(newDBName); - - //Add trait to column - String colTraitDetails = createTrait(columnGuid); + String sdGuid = assertSDIsRegistered(HiveMetaStoreBridge.getStorageDescQFName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName)), null); - //Add trait to sd - String sdTraitDetails = createTrait(sdGuid); + assertDatabaseIsRegistered(newDBName); - String partColumnGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "dt")); - //Add trait to part col keys - String partColTraitDetails = createTrait(partColumnGuid); + String colTraitDetails = createTrait(columnGuid); //Add trait to column + String sdTraitDetails = createTrait(sdGuid); //Add trait to sd + String partColumnGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "dt")); + String partColTraitDetails = createTrait(partColumnGuid); //Add trait to part col keys + String newTableName = tableName(); + String query = String.format("alter table %s rename to %s", DEFAULT_DB + "." + tableName, newDBName + "." + newTableName); - final String newTableName = tableName(); - String query = String.format("alter table %s rename to %s", DEFAULT_DB + "." + tableName, newDBName + "." + newTableName); runCommandWithDelay(query, 1000); String newColGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName), NAME)); + Assert.assertEquals(newColGuid, columnGuid); assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, tableName), NAME)); assertTrait(columnGuid, colTraitDetails); + String newSdGuid = assertSDIsRegistered(HiveMetaStoreBridge.getStorageDescQFName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName)), null); - Assert.assertEquals(newSdGuid, sdGuid); + Assert.assertEquals(newSdGuid, sdGuid); assertTrait(sdGuid, sdTraitDetails); assertTrait(partColumnGuid, partColTraitDetails); - assertTableIsNotRegistered(DEFAULT_DB, tableName); assertTableIsRegistered(newDBName, newTableName, new AssertPredicate() { @Override - public void assertOnEntity(final Referenceable entity) throws Exception { - Referenceable sd = ((Referenceable) entity.get(HiveMetaStoreBridge.STORAGE_DESC)); - String location = (String) sd.get(HiveMetaStoreBridge.LOCATION); - assertTrue(location.contains(newTableName)); - Assert.assertEquals(entity.get(HiveMetaStoreBridge.CREATE_TIME), createTime); + public void assertOnEntity(final AtlasEntity entity) throws Exception { + AtlasObjectId sd = toAtlasObjectId(entity.getAttribute(ATTRIBUTE_STORAGEDESC)); + + assertNotNull(sd); } }); } - private List<Referenceable> getColumns(String dbName, String tableName) throws Exception { - String tableId = assertTableIsRegistered(dbName, tableName); - Referenceable tableRef = atlasClient.getEntity(tableId); + private List<AtlasEntity> getColumns(String dbName, String tableName) throws Exception { + String tableId = assertTableIsRegistered(dbName, tableName); + AtlasEntityWithExtInfo tblEntityWithExtInfo = atlasClientV2.getEntityByGuid(tableId); + AtlasEntity tableEntity = tblEntityWithExtInfo.getEntity(); //with soft delete, the deleted columns are returned as well. So, filter the deleted ones - List<Referenceable> columns = ((List<Referenceable>) tableRef.get(HiveMetaStoreBridge.COLUMNS)); - List<Referenceable> activeColumns = new ArrayList<>(); - for (Referenceable col : columns) { - if (col.getId().getState() == Id.EntityState.ACTIVE) { - activeColumns.add(col); + List<AtlasObjectId> columns = toAtlasObjectIdList(tableEntity.getAttribute(ATTRIBUTE_COLUMNS)); + List<AtlasEntity> activeColumns = new ArrayList<>(); + + for (AtlasObjectId col : columns) { + AtlasEntity columnEntity = tblEntityWithExtInfo.getEntity(col.getGuid()); + + if (columnEntity.getStatus() == AtlasEntity.Status.ACTIVE) { + activeColumns.add(columnEntity); } } + return activeColumns; } - - private String createTrait(String guid) throws AtlasServiceException, JSONException { + private String createTrait(String guid) throws AtlasServiceException { //add trait //valid type names in v2 must consist of a letter followed by a sequence of letter, number, or _ characters - String traitName = "PII_Trait" + random(); - atlasClient.createTraitType(traitName); + String traitName = "PII_Trait" + random(); + AtlasClassificationDef piiTrait = AtlasTypeUtil.createTraitTypeDef(traitName, ImmutableSet.<String>of()); + + atlasClientV2.createAtlasTypeDefs(new AtlasTypesDef(Collections.emptyList(), Collections.emptyList(), Collections.singletonList(piiTrait), Collections.emptyList())); + atlasClientV2.addClassifications(guid, Collections.singletonList(new AtlasClassification(piiTrait.getName()))); - Struct traitInstance = new Struct(traitName); - atlasClient.addTrait(guid, traitInstance); return traitName; } - private void assertTrait(String guid, String traitName) throws AtlasServiceException, JSONException { - List<String> traits = atlasClient.listTraits(guid); - Assert.assertEquals(traits.get(0), traitName); + private void assertTrait(String guid, String traitName) throws AtlasServiceException { + AtlasClassification.AtlasClassifications classifications = atlasClientV2.getClassifications(guid); + + Assert.assertEquals(classifications.getList().get(0).getTypeName(), traitName); } @Test public void testAlterTableAddColumn() throws Exception { String tableName = createTable(); - String column = columnName(); - String query = "alter table " + tableName + " add columns (" + column + " string)"; + String column = columnName(); + String query = "alter table " + tableName + " add columns (" + column + " string)"; + runCommand(query); - assertColumnIsRegistered(HiveMetaStoreBridge - .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), - column)); + assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), column)); //Verify the number of columns present in the table - final List<Referenceable> columns = getColumns(DEFAULT_DB, tableName); + List<AtlasEntity> columns = getColumns(DEFAULT_DB, tableName); + Assert.assertEquals(columns.size(), 3); } //ATLAS-1321: Disable problematic tests. Need to revisit and fix them later @Test(enabled = false) public void testAlterTableDropColumn() throws Exception { - String tableName = createTable(); - final String colDropped = "id"; - String query = "alter table " + tableName + " replace columns (name string)"; + String tableName = createTable(); + String colDropped = "id"; + String query = "alter table " + tableName + " replace columns (name string)"; + runCommand(query); - assertColumnIsNotRegistered(HiveMetaStoreBridge - .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), - colDropped)); + assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), colDropped)); //Verify the number of columns present in the table - final List<Referenceable> columns = getColumns(DEFAULT_DB, tableName); + List<AtlasEntity> columns = getColumns(DEFAULT_DB, tableName); + assertEquals(columns.size(), 1); - assertEquals(columns.get(0).get(NAME), "name"); + assertEquals(columns.get(0).getAttribute(NAME), "name"); } @Test @@ -995,193 +1070,221 @@ public class HiveHookIT extends HiveITBase { //Change name String oldColName = NAME; String newColName = "name1"; - String tableName = createTable(); - String query = String.format("alter table %s change %s %s string", tableName, oldColName, newColName); + String tableName = createTable(); + String query = String.format("alter table %s change %s %s string", tableName, oldColName, newColName); + runCommandWithDelay(query, 1000); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName( - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); - assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName( - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName)); + assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); + assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName)); //Verify the number of columns present in the table - List<Referenceable> columns = getColumns(DEFAULT_DB, tableName); + List<AtlasEntity> columns = getColumns(DEFAULT_DB, tableName); + Assert.assertEquals(columns.size(), 2); //Change column type oldColName = "name1"; newColName = "name2"; - final String newColType = "int"; + + String newColType = "int"; + query = String.format("alter table %s change column %s %s %s", tableName, oldColName, newColName, newColType); + runCommandWithDelay(query, 1000); columns = getColumns(DEFAULT_DB, tableName); + Assert.assertEquals(columns.size(), 2); - String newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName( - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); + String newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); + assertColumnIsRegistered(newColQualifiedName, new AssertPredicate() { @Override - public void assertOnEntity(Referenceable entity) throws Exception { - assertEquals(entity.get("type"), "int"); + public void assertOnEntity(AtlasEntity entity) throws Exception { + assertEquals(entity.getAttribute("type"), "int"); } }); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName( - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); + assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); //Change name and add comment oldColName = "name2"; newColName = "name3"; - final String comment = "added comment"; - query = String.format("alter table %s change column %s %s %s COMMENT '%s' after id", tableName, oldColName, - newColName, newColType, comment); + + String comment = "added comment"; + + query = String.format("alter table %s change column %s %s %s COMMENT '%s' after id", tableName, oldColName, newColName, newColType, comment); + runCommandWithDelay(query, 1000); columns = getColumns(DEFAULT_DB, tableName); + Assert.assertEquals(columns.size(), 2); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName( - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); - newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName( - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); + assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); + + newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); + assertColumnIsRegistered(newColQualifiedName, new AssertPredicate() { @Override - public void assertOnEntity(Referenceable entity) throws Exception { - assertEquals(entity.get(HiveMetaStoreBridge.COMMENT), comment); + public void assertOnEntity(AtlasEntity entity) throws Exception { + assertEquals(entity.getAttribute(ATTRIBUTE_COMMENT), comment); } }); //Change column position oldColName = "name3"; newColName = "name4"; - query = String.format("alter table %s change column %s %s %s first", tableName, oldColName, newColName, - newColType); + query = String.format("alter table %s change column %s %s %s first", tableName, oldColName, newColName, newColType); + runCommandWithDelay(query, 1000); columns = getColumns(DEFAULT_DB, tableName); + Assert.assertEquals(columns.size(), 2); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName( - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); + assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); + + newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); - newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName( - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); assertColumnIsRegistered(newColQualifiedName); - final String finalNewColName = newColName; + String finalNewColName = newColName; + assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { - @Override - public void assertOnEntity(Referenceable entity) throws Exception { - List<Referenceable> columns = (List<Referenceable>) entity.get(HiveMetaStoreBridge.COLUMNS); - assertEquals(columns.get(0).get(NAME), finalNewColName); - assertEquals(columns.get(1).get(NAME), "id"); + @Override + public void assertOnEntity(AtlasEntity entity) throws Exception { + List<AtlasObjectId> columns = toAtlasObjectIdList(entity.getAttribute(ATTRIBUTE_COLUMNS)); + + assertEquals(columns.size(), 2); + } } - } ); //Change col position again oldColName = "name4"; newColName = "name5"; - query = String.format("alter table %s change column %s %s %s after id", tableName, oldColName, newColName, newColType); + query = String.format("alter table %s change column %s %s %s after id", tableName, oldColName, newColName, newColType); + runCommandWithDelay(query, 1000); columns = getColumns(DEFAULT_DB, tableName); + Assert.assertEquals(columns.size(), 2); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName( - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); + assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); + + newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); - newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName( - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); assertColumnIsRegistered(newColQualifiedName); //Check col position - final String finalNewColName2 = newColName; + String finalNewColName2 = newColName; + assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { - @Override - public void assertOnEntity(Referenceable entity) throws Exception { - List<Referenceable> columns = (List<Referenceable>) entity.get(HiveMetaStoreBridge.COLUMNS); - assertEquals(columns.get(1).get(NAME), finalNewColName2); - assertEquals(columns.get(0).get(NAME), "id"); + @Override + public void assertOnEntity(AtlasEntity entity) throws Exception { + List<AtlasObjectId> columns = toAtlasObjectIdList(entity.getAttribute(ATTRIBUTE_COLUMNS)); + + assertEquals(columns.size(), 2); + } } - } ); } /* - The test is disabled by default - Reason : Atlas uses Hive version 1.2.x and the Hive patch HIVE-13112 which enables column level lineage is not - committed in Hive version 1.2.x - This test will fail if the lineage information is not available from Hive - Once the patch for HIVE-13112 is committed to Hive branch 1.2.x, the test can be enabled - Please track HIVE-14706 to know the status of column lineage availability in latest Hive versions i.e 2.1.x - */ + The test is disabled by default + Reason : Atlas uses Hive version 1.2.x and the Hive patch HIVE-13112 which enables column level lineage is not + committed in Hive version 1.2.x + This test will fail if the lineage information is not available from Hive + Once the patch for HIVE-13112 is committed to Hive branch 1.2.x, the test can be enabled + Please track HIVE-14706 to know the status of column lineage availability in latest Hive versions i.e 2.1.x + */ @Test(enabled = false) public void testColumnLevelLineage() throws Exception { String sourceTable = "table" + random(); + runCommand("create table " + sourceTable + "(a int, b int)"); + String sourceTableGUID = assertTableIsRegistered(DEFAULT_DB, sourceTable); - String a_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, sourceTable), "a")); - String b_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, sourceTable), "b")); + String a_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, sourceTable), "a")); + String b_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, sourceTable), "b")); + String ctasTableName = "table" + random(); + String query = "create table " + ctasTableName + " as " + "select sum(a+b) as a, count(*) as b from " + sourceTable; - String ctasTableName = "table" + random(); - String query = "create table " + ctasTableName + " as " + - "select sum(a+b) as a, count(*) as b from " + sourceTable; runCommand(query); String dest_a_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, ctasTableName), "a")); String dest_b_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, ctasTableName), "b")); - final Set<ReadEntity> inputs = getInputs(sourceTable, Entity.Type.TABLE); - final Set<WriteEntity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE); - HiveHook.HiveEventContext event = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs); + Set<ReadEntity> inputs = getInputs(sourceTable, Entity.Type.TABLE); + Set<WriteEntity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE); + HiveEventContext event = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs); + assertProcessIsRegistered(event); assertTableIsRegistered(DEFAULT_DB, ctasTableName); - String processQName = sortEventsAndGetProcessQualifiedName(event); + String processQName = sortEventsAndGetProcessQualifiedName(event); + List<String> aLineageInputs = Arrays.asList(a_guid, b_guid); + String aLineageProcessName = processQName + ":" + "a"; - List<String> aLineageInputs = Arrays.asList(a_guid, b_guid); - String aLineageProcessName = processQName + ":" + "a"; LOG.debug("Searching for column lineage process {} ", aLineageProcessName); - String guid = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, aLineageProcessName, null); - List<Id> processInputs = (List<Id>) atlasClient.getEntity(guid).get("inputs"); - List<String> processInputsAsString = new ArrayList<>(); - for(Id input: processInputs){ - processInputsAsString.add(input._getId()); + String guid = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), ATTRIBUTE_QUALIFIED_NAME, aLineageProcessName, null); + + AtlasEntity colLineageEntity = atlasClientV2.getEntityByGuid(guid).getEntity(); + List<AtlasObjectId> processInputs = toAtlasObjectIdList(colLineageEntity.getAttribute("inputs")); + List<String> processInputsAsString = new ArrayList<>(); + + for(AtlasObjectId input: processInputs){ + processInputsAsString.add(input.getGuid()); } + Collections.sort(processInputsAsString); Collections.sort(aLineageInputs); + Assert.assertEquals(processInputsAsString, aLineageInputs); - List<String> bLineageInputs = Arrays.asList(sourceTableGUID); - String bLineageProcessName = processQName + ":" + "b"; + List<String> bLineageInputs = Arrays.asList(sourceTableGUID); + String bLineageProcessName = processQName + ":" + "b"; + LOG.debug("Searching for column lineage process {} ", bLineageProcessName); - String guid1 = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, bLineageProcessName, null); - List<Id> bProcessInputs = (List<Id>) atlasClient.getEntity(guid1).get("inputs"); - List<String> bProcessInputsAsString = new ArrayList<>(); - for(Id input: bProcessInputs){ - bProcessInputsAsString.add(input._getId()); + + String guid1 = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), ATTRIBUTE_QUALIFIED_NAME, bLineageProcessName, null); + + + AtlasEntity colLineageEntity1 = atlasClientV2.getEntityByGuid(guid1).getEntity(); + List<AtlasObjectId> bProcessInputs = toAtlasObjectIdList(colLineageEntity1.getAttribute("inputs")); + List<String> bProcessInputsAsString = new ArrayList<>(); + + for(AtlasObjectId input: bProcessInputs){ + bProcessInputsAsString.add(input.getGuid()); } + Collections.sort(bProcessInputsAsString); Collections.sort(bLineageInputs); + Assert.assertEquals(bProcessInputsAsString, bLineageInputs); //Test lineage API response - JSONObject response = atlasClient.getInputGraphForEntity(dest_a_guid); - JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices"); - JSONObject dest_a_val = (JSONObject) vertices.get(dest_a_guid); - JSONObject src_a_val = (JSONObject) vertices.get(a_guid); - JSONObject src_b_val = (JSONObject) vertices.get(b_guid); + AtlasLineageInfo atlasLineageInfoInput = atlasClientV2.getLineageInfo(dest_a_guid, AtlasLineageInfo.LineageDirection.INPUT,0); + Map<String, AtlasEntityHeader> entityMap = atlasLineageInfoInput.getGuidEntityMap(); + + JSONObject response = atlasClient.getInputGraphForEntity(dest_a_guid); + JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices"); + JSONObject dest_a_val = vertices.getJSONObject(dest_a_guid); + JSONObject src_a_val = vertices.getJSONObject(a_guid); + JSONObject src_b_val = vertices.getJSONObject(b_guid); + Assert.assertNotNull(dest_a_val); Assert.assertNotNull(src_a_val); Assert.assertNotNull(src_b_val); + JSONObject b_response = atlasClient.getInputGraphForEntity(dest_b_guid); + JSONObject b_vertices = b_response.getJSONObject("values").getJSONObject("vertices"); + JSONObject b_val = b_vertices.getJSONObject(dest_b_guid); + JSONObject src_tbl_val = b_vertices.getJSONObject(sourceTableGUID); - JSONObject b_response = atlasClient.getInputGraphForEntity(dest_b_guid); - JSONObject b_vertices = b_response.getJSONObject("values").getJSONObject("vertices"); - JSONObject b_val = (JSONObject) b_vertices.get(dest_b_guid); - JSONObject src_tbl_val = (JSONObject) b_vertices.get(sourceTableGUID); Assert.assertNotNull(b_val); Assert.assertNotNull(src_tbl_val); } @@ -1189,44 +1292,46 @@ public class HiveHookIT extends HiveITBase { @Test public void testTruncateTable() throws Exception { String tableName = createTable(false); - String query = String.format("truncate table %s", tableName); + String query = String.format("truncate table %s", tableName); + runCommand(query); Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE); + String ta
<TRUNCATED>
