Repository: incubator-atlas Updated Branches: refs/heads/master f623bddf8 -> f51c88615
ATLAS-917 Add hdfs paths to process qualified name for non-partition based queries(sumasai) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/f51c8861 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/f51c8861 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/f51c8861 Branch: refs/heads/master Commit: f51c886158c9c0f7dc115f0c6f0aa0e08772e0b9 Parents: f623bdd Author: Suma Shivaprasad <[email protected]> Authored: Fri Jul 1 12:09:32 2016 -0700 Committer: Suma Shivaprasad <[email protected]> Committed: Fri Jul 1 12:09:32 2016 -0700 ---------------------------------------------------------------------- .../atlas/hive/bridge/HiveMetaStoreBridge.java | 4 +- .../org/apache/atlas/hive/hook/HiveHook.java | 182 +++++-- .../org/apache/atlas/hive/hook/HiveHookIT.java | 483 +++++++++++++------ .../java/org/apache/atlas/hook/AtlasHook.java | 2 +- release-log.txt | 1 + 5 files changed, 473 insertions(+), 199 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f51c8861/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index 0045780..cd0e964 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -426,8 +426,8 @@ public class HiveMetaStoreBridge { createDate = new Date(hiveTable.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR); LOG.debug("Setting create time to {} ", createDate); tableReference.set(HiveDataModelGenerator.CREATE_TIME, createDate); - } catch(NumberFormatException ne) { - LOG.error("Error while updating createTime for the table {} ", hiveTable.getCompleteName(), ne); + } catch(Exception ne) { + LOG.error("Error while setting createTime for the table {} ", hiveTable.getCompleteName(), ne); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f51c8861/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index a1a00b3..99009ba 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -21,6 +21,7 @@ package org.apache.atlas.hive.hook; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import kafka.security.auth.Write; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasConstants; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; @@ -66,7 +67,9 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -86,8 +89,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize"; public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; - private static final String SEP = ":".intern(); - private static final String IO_SEP = "->".intern(); + static final String SEP = ":".intern(); + static final String IO_SEP = "->".intern(); private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>(); @@ -291,6 +294,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private void deleteDatabase(HiveMetaStoreBridge dgiBridge, HiveEventContext event) { if (event.getOutputs().size() > 1) { LOG.info("Starting deletion of tables and databases with cascade {} ", event.getQueryStr()); + } else { + LOG.info("Starting deletion of database {} ", event.getQueryStr()); } for (WriteEntity output : event.getOutputs()) { @@ -549,10 +554,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return str.toLowerCase().trim(); } - public static String normalize(String queryStr) { - return lower(queryStr); - } - private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception { Set<ReadEntity> inputs = event.getInputs(); Set<WriteEntity> outputs = event.getOutputs(); @@ -567,8 +568,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { LOG.info("Query id/plan is missing for {}", event.getQueryStr()); } - final SortedMap<Entity, Referenceable> source = new TreeMap<>(entityComparator); - final SortedMap<Entity, Referenceable> target = new TreeMap<>(entityComparator); + final SortedMap<ReadEntity, Referenceable> source = new TreeMap<>(entityComparator); + final SortedMap<WriteEntity, Referenceable> target = new TreeMap<>(entityComparator); final Set<String> dataSets = new HashSet<>(); final Set<Referenceable> entities = new LinkedHashSet<>(); @@ -577,16 +578,27 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { // filter out select queries which do not modify data if (!isSelectQuery) { - for (ReadEntity readEntity : event.getInputs()) { + + SortedSet<ReadEntity> sortedHiveInputs = new TreeSet<>(entityComparator);; + if ( event.getInputs() != null) { + sortedHiveInputs.addAll(event.getInputs()); + } + + SortedSet<WriteEntity> sortedHiveOutputs = new TreeSet<>(entityComparator); + if ( event.getOutputs() != null) { + sortedHiveOutputs.addAll(event.getOutputs()); + } + + for (ReadEntity readEntity : sortedHiveInputs) { processHiveEntity(dgiBridge, event, readEntity, dataSets, source, entities); } - for (WriteEntity writeEntity : event.getOutputs()) { + for (WriteEntity writeEntity : sortedHiveOutputs) { processHiveEntity(dgiBridge, event, writeEntity, dataSets, target, entities); } if (source.size() > 0 || target.size() > 0) { - Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, source, target); + Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, sortedHiveInputs, sortedHiveOutputs, source, target); entities.add(processReferenceable); event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<>(entities))); } else { @@ -597,8 +609,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } } - private void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, Set<String> dataSetsProcessed, - SortedMap<Entity, Referenceable> dataSets, Set<Referenceable> entities) throws Exception { + private <T extends Entity> void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, T entity, Set<String> dataSetsProcessed, + SortedMap<T, Referenceable> dataSets, Set<Referenceable> entities) throws Exception { if (entity.getType() == Type.TABLE || entity.getType() == Type.PARTITION) { final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable()); if (!dataSetsProcessed.contains(tblQFName)) { @@ -609,7 +621,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } } else if (entity.getType() == Type.DFS_DIR) { final String pathUri = lower(new Path(entity.getLocation()).toString()); - LOG.info("Registering DFS Path {} ", pathUri); + LOG.debug("Registering DFS Path {} ", pathUri); if (!dataSetsProcessed.contains(pathUri)) { Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri); dataSets.put(entity, hdfsPath); @@ -653,7 +665,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, final HiveEventContext event, final LinkedHashMap<Type, Referenceable> tables) throws HiveException, MalformedURLException { List<Referenceable> entities = new ArrayList<>(); - final Entity hiveEntity = getEntityByType(event.getOutputs(), Type.TABLE); + final WriteEntity hiveEntity = (WriteEntity) getEntityByType(event.getOutputs(), Type.TABLE); Table hiveTable = hiveEntity.getTable(); //Refresh to get the correct location hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName()); @@ -665,18 +677,25 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { dfsEntity.setTyp(Type.DFS_DIR); dfsEntity.setName(location); - SortedMap<Entity, Referenceable> inputs = new TreeMap<Entity, Referenceable>(entityComparator) {{ + SortedMap<ReadEntity, Referenceable> hiveInputsMap = new TreeMap<ReadEntity, Referenceable>(entityComparator) {{ put(dfsEntity, dgiBridge.fillHDFSDataSet(location)); }}; - SortedMap<Entity, Referenceable> outputs = new TreeMap<Entity, Referenceable>(entityComparator) {{ + SortedMap<WriteEntity, Referenceable> hiveOutputsMap = new TreeMap<WriteEntity, Referenceable>(entityComparator) {{ put(hiveEntity, tables.get(Type.TABLE)); }}; - Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, inputs, outputs); + SortedSet<ReadEntity> sortedIps = new TreeSet<>(entityComparator); + sortedIps.addAll(hiveInputsMap.keySet()); + SortedSet<WriteEntity> sortedOps = new TreeSet<>(entityComparator); + sortedOps.addAll(hiveOutputsMap.keySet()); + + Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, + sortedIps, sortedOps, hiveInputsMap, hiveOutputsMap); String tableQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), hiveTable); if (isCreateOp(event)){ + LOG.info("Overriding process qualified name to {}", tableQualifiedName); processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName); } entities.addAll(tables.values()); @@ -689,6 +708,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { if (HiveOperation.CREATETABLE.equals(hiveEvent.getOperation()) || HiveOperation.CREATEVIEW.equals(hiveEvent.getOperation()) || HiveOperation.ALTERVIEW_AS.equals(hiveEvent.getOperation()) + || HiveOperation.ALTERTABLE_LOCATION.equals(hiveEvent.getOperation()) || HiveOperation.CREATETABLE_AS_SELECT.equals(hiveEvent.getOperation())) { return true; } @@ -696,11 +716,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } private Referenceable getProcessReferenceable(HiveMetaStoreBridge dgiBridge, HiveEventContext hiveEvent, - SortedMap<Entity, Referenceable> source, SortedMap<Entity, Referenceable> target) { + final SortedSet<ReadEntity> sortedHiveInputs, final SortedSet<WriteEntity> sortedHiveOutputs, SortedMap<ReadEntity, Referenceable> source, SortedMap<WriteEntity, Referenceable> target) { Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); String queryStr = lower(hiveEvent.getQueryStr()); - processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(hiveEvent.getOperation(), source, target)); + processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(hiveEvent, sortedHiveInputs, sortedHiveOutputs, source, target)); LOG.debug("Registering query: {}", queryStr); List<Referenceable> sourceList = new ArrayList<>(source.values()); @@ -733,51 +753,113 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } @VisibleForTesting - static String getProcessQualifiedName(HiveOperation op, SortedMap<Entity, Referenceable> inputs, SortedMap<Entity, Referenceable> outputs) { + static String getProcessQualifiedName(HiveEventContext eventContext, final SortedSet<ReadEntity> sortedHiveInputs, final SortedSet<WriteEntity> sortedHiveOutputs, SortedMap<ReadEntity, Referenceable> hiveInputsMap, SortedMap<WriteEntity, Referenceable> hiveOutputsMap) { + HiveOperation op = eventContext.getOperation(); StringBuilder buffer = new StringBuilder(op.getOperationName()); - addDatasets(op, buffer, inputs); + + boolean ignoreHDFSPathsinQFName = ignoreHDFSPathsinQFName(op, sortedHiveInputs, sortedHiveOutputs); + if ( ignoreHDFSPathsinQFName && LOG.isDebugEnabled()) { + LOG.debug("Ignoring HDFS paths in qualifiedName for {} {} ", op, eventContext.getQueryStr()); + } + + addInputs(op, sortedHiveInputs, buffer, hiveInputsMap, ignoreHDFSPathsinQFName); buffer.append(IO_SEP); - addDatasets(op, buffer, outputs); + addOutputs(op, sortedHiveOutputs, buffer, hiveOutputsMap, ignoreHDFSPathsinQFName); LOG.info("Setting process qualified name to {}", buffer); return buffer.toString(); } - private static void addDatasets(HiveOperation op, StringBuilder buffer, final Map<Entity, Referenceable> refs) { - if (refs != null) { - for (Entity input : refs.keySet()) { - final Entity entity = input; + private static boolean ignoreHDFSPathsinQFName(final HiveOperation op, final Set<ReadEntity> inputs, final Set<WriteEntity> outputs) { + switch (op) { + case LOAD: + case IMPORT: + return isPartitionBasedQuery(outputs); + case EXPORT: + return isPartitionBasedQuery(inputs); + case QUERY: + return true; + } + return false; + } - //HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations - if (addQueryType(op, entity)) { - buffer.append(SEP); - buffer.append(((WriteEntity) entity).getWriteType().name()); + private static boolean isPartitionBasedQuery(Set<? extends Entity> entities) { + for (Entity entity : entities) { + if (Type.PARTITION.equals(entity.getType())) { + return true; + } + } + return false; + } + + private static void addInputs(HiveOperation op, SortedSet<ReadEntity> sortedInputs, StringBuilder buffer, final Map<ReadEntity, Referenceable> refs, final boolean ignoreHDFSPathsInQFName) { + if (refs != null) { + if (sortedInputs != null) { + Set<String> dataSetsProcessed = new LinkedHashSet<>(); + for (Entity input : sortedInputs) { + + if (!dataSetsProcessed.contains(input.getName().toLowerCase())) { + //HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations + if (ignoreHDFSPathsInQFName && + (Type.DFS_DIR.equals(input.getType()) || Type.LOCAL_DIR.equals(input.getType()))) { + LOG.debug("Skipping dfs dir input addition to process qualified name {} ", input.getName()); + } else if (refs.containsKey(input)) { + addDataset(buffer, refs.get(input)); + } + dataSetsProcessed.add(input.getName().toLowerCase()); + } } - if (Type.DFS_DIR.equals(entity.getType()) || - Type.LOCAL_DIR.equals(entity.getType())) { - LOG.debug("Skipping dfs dir addition into process qualified name {} ", refs.get(input).get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME)); - } else { - buffer.append(SEP); - String dataSetQlfdName = (String) refs.get(input).get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); - // '/' breaks query parsing on ATLAS - buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", "")); + + } + } + } + + private static void addDataset(StringBuilder buffer, Referenceable ref) { + buffer.append(SEP); + String dataSetQlfdName = (String) ref.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); + // '/' breaks query parsing on ATLAS + buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", "")); + } + + private static void addOutputs(HiveOperation op, SortedSet<WriteEntity> sortedOutputs, StringBuilder buffer, final Map<WriteEntity, Referenceable> refs, final boolean ignoreHDFSPathsInQFName) { + if (refs != null) { + Set<String> dataSetsProcessed = new LinkedHashSet<>(); + if (sortedOutputs != null) { + for (Entity output : sortedOutputs) { + final Entity entity = output; + if (!dataSetsProcessed.contains(output.getName().toLowerCase())) { + //HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations + if (addQueryType(op, (WriteEntity) entity)) { + buffer.append(SEP); + buffer.append(((WriteEntity) entity).getWriteType().name()); + } + if (ignoreHDFSPathsInQFName && + (Type.DFS_DIR.equals(output.getType()) || Type.LOCAL_DIR.equals(output.getType()))) { + LOG.debug("Skipping dfs dir output addition to process qualified name {} ", output.getName()); + } else if (refs.containsKey(output)) { + addDataset(buffer, refs.get(output)); + } + dataSetsProcessed.add(output.getName().toLowerCase()); + } } } } } - private static boolean addQueryType(HiveOperation op, Entity entity) { - if (WriteEntity.class.isAssignableFrom(entity.getClass())) { - if (((WriteEntity) entity).getWriteType() != null && - op.equals(HiveOperation.QUERY)) { - switch (((WriteEntity) entity).getWriteType()) { - case INSERT: - case INSERT_OVERWRITE: - case UPDATE: - case DELETE: - case PATH_WRITE: + private static boolean addQueryType(HiveOperation op, WriteEntity entity) { + if (((WriteEntity) entity).getWriteType() != null && HiveOperation.QUERY.equals(op)) { + switch (((WriteEntity) entity).getWriteType()) { + case INSERT: + case INSERT_OVERWRITE: + case UPDATE: + case DELETE: + return true; + case PATH_WRITE: + //Add query type only for DFS paths and ignore local paths since they are not added as outputs + if ( !Type.LOCAL_DIR.equals(entity.getType())) { return true; - default: } + break; + default: } } return false; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f51c8861/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java index f9e1926..8ca47d9 100755 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java @@ -62,15 +62,22 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; import static org.apache.atlas.AtlasClient.NAME; import static org.apache.atlas.hive.hook.HiveHook.entityComparator; import static org.apache.atlas.hive.hook.HiveHook.getProcessQualifiedName; import static org.apache.atlas.hive.hook.HiveHook.lower; +import static org.apache.atlas.hive.hook.HiveHook.IO_SEP; +import static org.apache.atlas.hive.hook.HiveHook.SEP; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -82,6 +89,8 @@ public class HiveHookIT { private static final String DGI_URL = "http://localhost:21000/"; private static final String CLUSTER_NAME = "test"; public static final String DEFAULT_DB = "default"; + + private static final String PART_FILE = "2015-01-01"; private Driver driver; private AtlasClient atlasClient; private HiveMetaStoreBridge hiveMetaStoreBridge; @@ -262,7 +271,7 @@ public class HiveHookIT { validateHDFSPaths(processReference, INPUTS, pFile); } - private List<Entity> getInputs(String inputName, Entity.Type entityType) { + private Set<ReadEntity> getInputs(String inputName, Entity.Type entityType) { final ReadEntity entity = new ReadEntity(); if ( Entity.Type.DFS_DIR.equals(entityType)) { @@ -270,14 +279,13 @@ public class HiveHookIT { entity.setTyp(Entity.Type.DFS_DIR); } else { entity.setName(getQualifiedTblName(inputName)); - entity.setTyp(Entity.Type.TABLE); + entity.setTyp(entityType); } - return new ArrayList<Entity>() {{ add(entity); }}; + return new LinkedHashSet<ReadEntity>() {{ add(entity); }}; } - - private List<Entity> getOutputs(String inputName, Entity.Type entityType) { + private Set<WriteEntity> getOutputs(String inputName, Entity.Type entityType) { final WriteEntity entity = new WriteEntity(); if ( Entity.Type.DFS_DIR.equals(entityType) || Entity.Type.LOCAL_DIR.equals(entityType)) { @@ -285,27 +293,32 @@ public class HiveHookIT { entity.setTyp(entityType); } else { entity.setName(getQualifiedTblName(inputName)); - entity.setTyp(Entity.Type.TABLE); + entity.setTyp(entityType); } - return new ArrayList<Entity>() {{ add(entity); }}; + return new LinkedHashSet<WriteEntity>() {{ add(entity); }}; } - - private void validateOutputTables(Referenceable processReference, List<Entity> expectedTables) throws Exception { + private void validateOutputTables(Referenceable processReference, Set<WriteEntity> expectedTables) throws Exception { validateTables(processReference, OUTPUTS, expectedTables); } - private void validateInputTables(Referenceable processReference, List<Entity> expectedTables) throws Exception { + private void validateInputTables(Referenceable processReference, Set<ReadEntity> expectedTables) throws Exception { validateTables(processReference, INPUTS, expectedTables); } - private void validateTables(Referenceable processReference, String attrName, List<Entity> expectedTables) throws Exception { + private void validateTables(Referenceable processReference, String attrName, Set<? extends Entity> expectedTables) throws Exception { List<Id> tableRef = (List<Id>) processReference.get(attrName); + + Iterator<? extends Entity> iterator = expectedTables.iterator(); for(int i = 0; i < expectedTables.size(); i++) { - Referenceable entity = atlasClient.getEntity(tableRef.get(i)._getId()); - LOG.debug("Validating output {} {} ", i, entity); - Assert.assertEquals(entity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), expectedTables.get(i).getName()); + Entity hiveEntity = iterator.next(); + if (Entity.Type.TABLE.equals(hiveEntity.getType()) || + Entity.Type.DFS_DIR.equals(hiveEntity.getType())) { + Referenceable entity = atlasClient.getEntity(tableRef.get(i)._getId()); + LOG.debug("Validating output {} {} ", i, entity); + Assert.assertEquals(entity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), hiveEntity.getName()); + } } } @@ -338,18 +351,22 @@ public class HiveHookIT { String query = "create table " + ctasTableName + " as select * from " + tableName; runCommand(query); - final ReadEntity entity = new ReadEntity(); - entity.setName(getQualifiedTblName(tableName)); - entity.setTyp(Entity.Type.TABLE); + final Set<ReadEntity> readEntities = getInputs(tableName, Entity.Type.TABLE); + final Set<WriteEntity> writeEntities = getOutputs(ctasTableName, Entity.Type.TABLE); - final WriteEntity writeEntity = new WriteEntity(); - writeEntity.setTyp(Entity.Type.TABLE); - writeEntity.setName(getQualifiedTblName(ctasTableName)); - - assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, new ArrayList<Entity>() {{ add(entity); }}, new ArrayList<Entity>() {{ add(writeEntity); }}); + assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, readEntities, writeEntities)); assertTableIsRegistered(DEFAULT_DB, ctasTableName); } + private HiveHook.HiveEventContext constructEvent(String query, HiveOperation op, Set<ReadEntity> inputs, Set<WriteEntity> outputs) { + HiveHook.HiveEventContext event = new HiveHook.HiveEventContext(); + event.setQueryStr(query); + event.setOperation(op); + event.setInputs(inputs); + event.setOutputs(outputs); + return event; + } + @Test public void testDropAndRecreateCTASOutput() throws Exception { String tableName = createTable(); @@ -359,10 +376,11 @@ public class HiveHookIT { assertTableIsRegistered(DEFAULT_DB, ctasTableName); - List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE); - List<Entity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE); + Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); + Set<WriteEntity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE); - String processId = assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs); + final HiveHook.HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs); + String processId = assertProcessIsRegistered(hiveEventContext); final String drpquery = String.format("drop table %s ", ctasTableName); runCommand(drpquery); @@ -371,14 +389,13 @@ public class HiveHookIT { //Fix after ATLAS-876 runCommand(query); assertTableIsRegistered(DEFAULT_DB, ctasTableName); - String process2Id = assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs); + String process2Id = assertProcessIsRegistered(hiveEventContext, inputs, outputs); Assert.assertEquals(process2Id, processId); Referenceable processRef = atlasClient.getEntity(processId); - validateInputTables(processRef, inputs); - outputs.add(outputs.get(0)); + outputs.add(outputs.iterator().next()); validateOutputTables(processRef, outputs); } @@ -389,7 +406,7 @@ public class HiveHookIT { String query = "create view " + viewName + " as select * from " + tableName; runCommand(query); - assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, getInputs(tableName, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)); + assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(tableName, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE))); assertTableIsRegistered(DEFAULT_DB, viewName); } @@ -403,7 +420,7 @@ public class HiveHookIT { runCommand(query); String table1Id = assertTableIsRegistered(DEFAULT_DB, table1Name); - assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, getInputs(table1Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)); + assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(table1Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE))); String viewId = assertTableIsRegistered(DEFAULT_DB, viewName); //Check lineage which includes table1 @@ -419,7 +436,7 @@ public class HiveHookIT { runCommand(query); //Check if alter view process is reqistered - assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, getInputs(table2Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)); + assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(table2Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE))); String table2Id = assertTableIsRegistered(DEFAULT_DB, table2Name); Assert.assertEquals(assertTableIsRegistered(DEFAULT_DB, viewName), viewId); @@ -456,9 +473,7 @@ public class HiveHookIT { String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName; runCommand(query); - List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE); - - assertProcessIsRegistered(query, HiveOperation.LOAD, null, outputs); + assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE))); } @Test @@ -466,41 +481,56 @@ public class HiveHookIT { String tableName = createTable(true); String loadFile = file("load"); - String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')"; + String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')"; runCommand(query); - validateProcess(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE)); + assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE))); } @Test - public void testLoadDFSPath() throws Exception { + public void testLoadDFSPathPartitioned() throws Exception { String tableName = createTable(true, true, false); - String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); + assertTableIsRegistered(DEFAULT_DB, tableName); - String loadFile = createTestDFSFile("loadDFSFile"); - String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')"; + final String loadFile = createTestDFSFile("loadDFSFile"); + String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')"; runCommand(query); - final List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE); - Referenceable processReference = validateProcess(query, HiveOperation.LOAD, getInputs(loadFile, Entity.Type.DFS_DIR), outputs); + final Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE); + final Set<ReadEntity> inputs = getInputs(loadFile, Entity.Type.DFS_DIR); - validateHDFSPaths(processReference, INPUTS, loadFile); + final Set<WriteEntity> partitionOps = new LinkedHashSet<>(outputs); + partitionOps.addAll(getOutputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION)); + Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.LOAD, inputs, partitionOps), inputs, outputs); + validateHDFSPaths(processReference, INPUTS, loadFile); validateOutputTables(processReference, outputs); + + final String loadFile2 = createTestDFSFile("loadDFSFile1"); + query = "load data inpath '" + loadFile2 + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')"; + runCommand(query); + + Set<ReadEntity> process2Inputs = getInputs(loadFile2, Entity.Type.DFS_DIR); + Set<ReadEntity> expectedInputs = new LinkedHashSet<>(); + expectedInputs.addAll(process2Inputs); + expectedInputs.addAll(inputs); + + validateProcess(constructEvent(query, HiveOperation.LOAD, expectedInputs, partitionOps), expectedInputs, outputs); + } private String getQualifiedTblName(String inputTable) { String inputtblQlfdName = inputTable; - if (inputTable != null && !inputTable.contains(".")) { + if (inputTable != null && !inputTable.contains("@")) { inputtblQlfdName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, inputTable); } return inputtblQlfdName; } - private Referenceable validateProcess(String query, HiveOperation op, List<Entity> inputTables, List<Entity> outputTables) throws Exception { - String processId = assertProcessIsRegistered(query, op, inputTables, outputTables); + private Referenceable validateProcess(HiveHook.HiveEventContext event, Set<ReadEntity> inputTables, Set<WriteEntity> outputTables) throws Exception { + String processId = assertProcessIsRegistered(event, inputTables, outputTables); Referenceable process = atlasClient.getEntity(processId); if (inputTables == null) { Assert.assertNull(process.get(INPUTS)); @@ -519,25 +549,47 @@ public class HiveHookIT { return process; } + private Referenceable validateProcess(HiveHook.HiveEventContext event) throws Exception { + return validateProcess(event, event.getInputs(), event.getOutputs()); + } + @Test public void testInsertIntoTable() throws Exception { - String tableName = createTable(); + String inputTable1Name = createTable(); + String inputTable2Name = createTable(); String insertTableName = createTable(); - assertTableIsRegistered(DEFAULT_DB, tableName); + assertTableIsRegistered(DEFAULT_DB, inputTable1Name); assertTableIsRegistered(DEFAULT_DB, insertTableName); - String query = "insert into " + insertTableName + " select id, name from " + tableName; + String query = "insert into " + insertTableName + " select t1.id, t1.name from " + inputTable2Name + " as t2, " + inputTable1Name + " as t1 where t1.id=t2.id"; runCommand(query); - List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE); - List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE); - ((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT); + final Set<ReadEntity> inputs = getInputs(inputTable1Name, Entity.Type.TABLE); + inputs.addAll(getInputs(inputTable2Name, Entity.Type.TABLE)); + + Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE); + (outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT); + + HiveHook.HiveEventContext event = constructEvent(query, HiveOperation.QUERY, inputs, outputs); + + Set<ReadEntity> expectedInputs = new TreeSet<ReadEntity>(entityComparator) {{ + addAll(inputs); + }}; + assertTableIsRegistered(DEFAULT_DB, insertTableName); + Referenceable processRef1 = validateProcess(event, expectedInputs, outputs); - Referenceable processRef1 = validateProcess(query, HiveOperation.QUERY, inputs, outputs); + //Test sorting of tbl names + SortedSet<String> sortedTblNames = new TreeSet<>(); + sortedTblNames.add(getQualifiedTblName(inputTable1Name)); + sortedTblNames.add(getQualifiedTblName(inputTable2Name)); + + //Verify sorted orer of inputs in qualified name + Assert.assertEquals(Joiner.on(SEP).join("QUERY", sortedTblNames.first(), sortedTblNames.last()) + IO_SEP + SEP + Joiner.on(SEP).join(WriteEntity.WriteType.INSERT.name(), getQualifiedTblName(insertTableName)) + , processRef1.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME)); //Rerun same query. Should result in same process runCommandWithDelay(query, 1000); - Referenceable processRef2 = validateProcess(query, HiveOperation.QUERY, inputs, outputs); + Referenceable processRef2 = validateProcess(event, expectedInputs, outputs); Assert.assertEquals(processRef1.getId()._getId(), processRef2.getId()._getId()); } @@ -550,7 +602,7 @@ public class HiveHookIT { "insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName; runCommand(query); - validateProcess(query, HiveOperation.QUERY, getInputs(tableName, Entity.Type.TABLE), null); + validateProcess(constructEvent(query, HiveOperation.QUERY, getInputs(tableName, Entity.Type.TABLE), null)); assertTableIsRegistered(DEFAULT_DB, tableName); } @@ -564,72 +616,78 @@ public class HiveHookIT { runCommand(query); - List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE); - final List<Entity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR); - ((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.PATH_WRITE); + Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); + final Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR); + ((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.PATH_WRITE); - Referenceable processReference = validateProcess(query, HiveOperation.QUERY, inputs, outputs); + final HiveHook.HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, outputs); + Referenceable processReference = validateProcess(hiveEventContext); validateHDFSPaths(processReference, OUTPUTS, pFile1); String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); validateInputTables(processReference, inputs); //Rerun same query with same HDFS path - - runCommand(query); - Referenceable process2Reference = validateProcess(query, HiveOperation.QUERY, inputs, outputs); + runCommandWithDelay(query, 1000); + assertTableIsRegistered(DEFAULT_DB, tableName); + Referenceable process2Reference = validateProcess(hiveEventContext); validateHDFSPaths(process2Reference, OUTPUTS, pFile1); Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId()); - //Rerun same query with a new HDFS path. Will result in same process since HDFS paths are not part of qualifiedName. + //Rerun same query with a new HDFS path. Will result in same process since HDFS paths is not part of qualified name for QUERY operations final String pFile2 = createTestDFSPath("somedfspath2"); query = "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName; - runCommand(query); - List<Entity> p3Outputs = new ArrayList<Entity>() {{ + runCommandWithDelay(query, 1000); + assertTableIsRegistered(DEFAULT_DB, tableName); + Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{ addAll(getOutputs(pFile2, Entity.Type.DFS_DIR)); addAll(outputs); }}; - Referenceable process3Reference = validateProcess(query, HiveOperation.QUERY, inputs, p3Outputs); + Referenceable process3Reference = validateProcess(constructEvent(query, HiveOperation.QUERY, inputs, p3Outputs)); validateHDFSPaths(process3Reference, OUTPUTS, pFile2); Assert.assertEquals(process3Reference.getId()._getId(), processReference.getId()._getId()); } @Test - public void testInsertIntoDFSDir() throws Exception { - String tableName = createTable(); + public void testInsertIntoDFSDirPartitioned() throws Exception { + + //Test with partitioned table + String tableName = createTable(true); String pFile1 = createTestDFSPath("somedfspath1"); String query = - "insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName; + "insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'"; runCommand(query); - List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE); - final List<Entity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR); - ((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.PATH_WRITE); + Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); + final Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR); + ((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.PATH_WRITE); - Referenceable processReference = validateProcess(query, HiveOperation.QUERY, inputs, outputs); - validateHDFSPaths(processReference, OUTPUTS, pFile1); + final Set<ReadEntity> partitionIps = new LinkedHashSet<>(inputs); + partitionIps.addAll(getInputs(DEFAULT_DB + "@" + tableName + "@dt='" + PART_FILE + "'", Entity.Type.PARTITION)); - String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); + Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, outputs), inputs, outputs); - validateInputTables(processReference, inputs); - - //Rerun same query with different HDFS path + //Rerun same query with different HDFS path. Should not create another process and should update it. final String pFile2 = createTestDFSPath("somedfspath2"); query = - "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName; + "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'"; runCommand(query); - List<Entity> p2Outputs = new ArrayList<Entity>() {{ - addAll(getOutputs(pFile2, Entity.Type.DFS_DIR)); + + final Set<WriteEntity> pFile2Outputs = getOutputs(pFile2, Entity.Type.DFS_DIR); + ((WriteEntity)pFile2Outputs.iterator().next()).setWriteType(WriteEntity.WriteType.PATH_WRITE); + //Now the process has 2 paths - one older with deleted reference to partition and another with the the latest partition + Set<WriteEntity> p2Outputs = new LinkedHashSet<WriteEntity>() {{ + addAll(pFile2Outputs); addAll(outputs); }}; - Referenceable process2Reference = validateProcess(query, HiveOperation.QUERY, inputs, p2Outputs); + Referenceable process2Reference = validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, pFile2Outputs), inputs, p2Outputs); validateHDFSPaths(process2Reference, OUTPUTS, pFile2); Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId()); @@ -647,12 +705,12 @@ public class HiveHookIT { runCommand(query); - List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE); - List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE); - outputs.get(0).setName(getQualifiedTblName(insertTableName + HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId())); - ((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT); + Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); + Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE); + outputs.iterator().next().setName(getQualifiedTblName(insertTableName + HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId())); + ((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT); - validateProcess(query, HiveOperation.QUERY, inputs, outputs); + validateProcess(constructEvent(query, HiveOperation.QUERY, inputs, outputs)); assertTableIsRegistered(DEFAULT_DB, tableName); assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true); @@ -660,21 +718,40 @@ public class HiveHookIT { @Test public void testInsertIntoPartition() throws Exception { - String tableName = createTable(true); - String insertTableName = createTable(true); + final boolean isPartitionedTable = true; + String tableName = createTable(isPartitionedTable); + String insertTableName = createTable(isPartitionedTable); String query = - "insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName - + " where dt = '2015-01-01'"; + "insert into " + insertTableName + " partition(dt = '"+ PART_FILE + "') select id, name from " + tableName + + " where dt = '"+ PART_FILE + "'"; runCommand(query); - List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE); - List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE); - ((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT); + final Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); + final Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE); + ((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT); + + final Set<ReadEntity> partitionIps = new LinkedHashSet<ReadEntity>() { + { + addAll(inputs); + add(getPartitionInput()); + + } + }; + + final Set<WriteEntity> partitionOps = new LinkedHashSet<WriteEntity>() { + { + addAll(outputs); + add(getPartitionOutput()); + + } + }; - validateProcess(query, HiveOperation.QUERY, inputs, outputs); + validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, partitionOps), inputs, outputs); assertTableIsRegistered(DEFAULT_DB, tableName); assertTableIsRegistered(DEFAULT_DB, insertTableName); + + //TODO - update } private String random() { @@ -701,65 +778,111 @@ public class HiveHookIT { assertTableIsRegistered(DEFAULT_DB, tableName); - String filename = "pfile://" + mkdir("export"); + String filename = "pfile://" + mkdir("exportUnPartitioned"); String query = "export table " + tableName + " to \"" + filename + "\""; runCommand(query); - List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE); - List<Entity> outputs = getOutputs(filename, Entity.Type.DFS_DIR); + Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); + Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR); - Referenceable processReference = validateProcess(query, HiveOperation.EXPORT, inputs, outputs); + Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.EXPORT, inputs, outputs)); validateHDFSPaths(processReference, OUTPUTS, filename); validateInputTables(processReference, inputs); //Import - tableName = createTable(false); - assertTableIsRegistered(DEFAULT_DB, tableName); + String importTableName = createTable(false); + assertTableIsRegistered(DEFAULT_DB, importTableName); - query = "import table " + tableName + " from '" + filename + "'"; + query = "import table " + importTableName + " from '" + filename + "'"; runCommand(query); - outputs = getOutputs(tableName, Entity.Type.TABLE); - processReference = validateProcess(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs); - validateHDFSPaths(processReference, INPUTS, filename); + outputs = getOutputs(importTableName, Entity.Type.TABLE); + validateProcess(constructEvent(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs)); - validateOutputTables(processReference, outputs); + //Should create another process + filename = "pfile://" + mkdir("export2UnPartitioned"); + query = "export table " + tableName + " to \"" + filename + "\""; + runCommand(query); + + inputs = getInputs(tableName, Entity.Type.TABLE); + outputs = getOutputs(filename, Entity.Type.DFS_DIR); + + validateProcess(constructEvent(query, HiveOperation.EXPORT, inputs, outputs)); + + //import again shouyld create another process + query = "import table " + importTableName + " from '" + filename + "'"; + runCommand(query); + outputs = getOutputs(importTableName, Entity.Type.TABLE); + validateProcess(constructEvent(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs)); } @Test public void testExportImportPartitionedTable() throws Exception { - String tableName = createTable(true); - String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); + boolean isPartitionedTable = true; + final String tableName = createTable(isPartitionedTable); + assertTableIsRegistered(DEFAULT_DB, tableName); //Add a partition String partFile = "pfile://" + mkdir("partition"); - String query = "alter table " + tableName + " add partition (dt='2015-01-01') location '" + partFile + "'"; + String query = "alter table " + tableName + " add partition (dt='"+ PART_FILE + "') location '" + partFile + "'"; runCommand(query); String filename = "pfile://" + mkdir("export"); query = "export table " + tableName + " to \"" + filename + "\""; runCommand(query); - List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE); - List<Entity> outputs = getOutputs(filename, Entity.Type.DFS_DIR); + final Set<ReadEntity> expectedExportInputs = getInputs(tableName, Entity.Type.TABLE); + final Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR); - Referenceable processReference = validateProcess(query, HiveOperation.EXPORT, inputs, outputs); - validateHDFSPaths(processReference, OUTPUTS, filename); + //Note that export has only partition as input in this case + final Set<ReadEntity> partitionIps = getInputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION); + partitionIps.addAll(expectedExportInputs); - validateInputTables(processReference, inputs); + Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.EXPORT, partitionIps, outputs), expectedExportInputs, outputs); + validateHDFSPaths(processReference, OUTPUTS, filename); //Import - tableName = createTable(true); - tableId = assertTableIsRegistered(DEFAULT_DB, tableName); + String importTableName = createTable(true); + assertTableIsRegistered(DEFAULT_DB, tableName); - query = "import table " + tableName + " from '" + filename + "'"; + query = "import table " + importTableName + " from '" + filename + "'"; runCommand(query); - outputs = getOutputs(tableName, Entity.Type.TABLE); - processReference = validateProcess(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs); - validateHDFSPaths(processReference, INPUTS, filename); + final Set<ReadEntity> expectedImportInputs = getInputs(filename, Entity.Type.DFS_DIR); + final Set<WriteEntity> importOutputs = getOutputs(importTableName, Entity.Type.TABLE); - validateOutputTables(processReference, outputs); + final Set<WriteEntity> partitionOps = getOutputs(DEFAULT_DB + "@" + importTableName + "@dt=" + PART_FILE, Entity.Type.PARTITION); + partitionOps.addAll(importOutputs); + + validateProcess(constructEvent(query, HiveOperation.IMPORT, expectedImportInputs , partitionOps), expectedImportInputs, importOutputs); + + //Export should update same process + filename = "pfile://" + mkdir("export2"); + query = "export table " + tableName + " to \"" + filename + "\""; + runCommand(query); + + final Set<WriteEntity> outputs2 = getOutputs(filename, Entity.Type.DFS_DIR); + Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{ + addAll(outputs2); + addAll(outputs); + }}; + + validateProcess(constructEvent(query, HiveOperation.EXPORT, partitionIps, outputs2), expectedExportInputs, p3Outputs); + + query = "alter table " + importTableName + " drop partition (dt='"+ PART_FILE + "')"; + runCommand(query); + + //Import should update same process + query = "import table " + importTableName + " from '" + filename + "'"; + runCommandWithDelay(query, 1000); + + final Set<ReadEntity> importInputs = getInputs(filename, Entity.Type.DFS_DIR); + final Set<ReadEntity> expectedImport2Inputs = new LinkedHashSet<ReadEntity>() {{ + addAll(importInputs); + addAll(expectedImportInputs); + }}; + + validateProcess(constructEvent(query, HiveOperation.IMPORT, importInputs, partitionOps), expectedImport2Inputs, importOutputs); } @Test @@ -767,13 +890,14 @@ public class HiveHookIT { String tableName = createTable(); String query = "select * from " + tableName; runCommand(query); - List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE); - assertProcessIsNotRegistered(query, HiveOperation.QUERY, inputs, null); + Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE); + HiveHook.HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, null); + assertProcessIsNotRegistered(hiveEventContext); //check with uppercase table name query = "SELECT * from " + tableName.toUpperCase(); runCommand(query); - assertProcessIsNotRegistered(query, HiveOperation.QUERY, inputs, null); + assertProcessIsNotRegistered(hiveEventContext); } @Test @@ -1042,10 +1166,10 @@ public class HiveHookIT { String query = String.format("truncate table %s", tableName); runCommand(query); - List<Entity> outputs = getInputs(tableName, Entity.Type.TABLE); + Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE); String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); - validateProcess(query, HiveOperation.TRUNCATETABLE, null, outputs); + validateProcess(constructEvent(query, HiveOperation.TRUNCATETABLE, null, outputs)); //Check lineage String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName); @@ -1144,7 +1268,7 @@ public class HiveHookIT { String query = "alter table " + tableName + " set location '" + testPath + "'"; runCommand(query); - String tableId = assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { + assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { @Override public void assertOnEntity(Referenceable tableRef) throws Exception { Referenceable sdRef = (Referenceable) tableRef.get(HiveDataModelGenerator.STORAGE_DESC); @@ -1152,10 +1276,11 @@ public class HiveHookIT { } }); - List<Entity> inputs = getInputs(testPath, Entity.Type.DFS_DIR); - List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE); + String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName, false), null); + + Referenceable processReference = atlasClient.getEntity(processId); - Referenceable processReference = validateProcess(query, HiveOperation.ALTERTABLE_LOCATION, inputs, outputs); validateHDFSPaths(processReference, INPUTS, testPath); } @@ -1302,6 +1427,20 @@ public class HiveHookIT { assertTableIsNotRegistered(DEFAULT_DB, tableName); } + private WriteEntity getPartitionOutput() { + WriteEntity partEntity = new WriteEntity(); + partEntity.setName(PART_FILE); + partEntity.setTyp(Entity.Type.PARTITION); + return partEntity; + } + + private ReadEntity getPartitionInput() { + ReadEntity partEntity = new ReadEntity(); + partEntity.setName(PART_FILE); + partEntity.setTyp(Entity.Type.PARTITION); + return partEntity; + } + @Test public void testDropDatabaseWithCascade() throws Exception { //Test Deletion of database and its corresponding tables @@ -1550,26 +1689,66 @@ public class HiveHookIT { } } - private String assertProcessIsRegistered(final String queryStr, HiveOperation op, final List<Entity> inputTbls, final List<Entity> outputTbls) throws Exception { - String processQFName = getProcessQualifiedName(op, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls)); - LOG.debug("Searching for process with query {}", processQFName); - return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() { - @Override - public void assertOnEntity(final Referenceable entity) throws Exception { - List<String> recentQueries = (List<String>) entity.get("recentQueries"); - Assert.assertEquals(recentQueries.get(0), lower(queryStr)); + private String assertProcessIsRegistered(final HiveHook.HiveEventContext event) throws Exception { + try { + SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator); + SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator); + + if ( event.getInputs() != null) { + sortedHiveInputs.addAll(event.getInputs()); } - }); + if ( event.getOutputs() != null) { + sortedHiveOutputs.addAll(event.getOutputs()); + } + + String processQFName = getProcessQualifiedName(event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs())); + LOG.debug("Searching for process with query {}", processQFName); + return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() { + @Override + public void assertOnEntity(final Referenceable entity) throws Exception { + List<String> recentQueries = (List<String>) entity.get("recentQueries"); + Assert.assertEquals(recentQueries.get(0), lower(event.getQueryStr())); + } + }); + } catch (Exception e) { + LOG.error("Exception : ", e); + throw e; + } + } + + private String assertProcessIsRegistered(final HiveHook.HiveEventContext event, final Set<ReadEntity> inputTbls, final Set<WriteEntity> outputTbls) throws Exception { + try { + SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator); + SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator); + if ( event.getInputs() != null) { + sortedHiveInputs.addAll(event.getInputs()); + } + if ( event.getOutputs() != null) { + sortedHiveOutputs.addAll(event.getOutputs()); + } + String processQFName = getProcessQualifiedName(event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls)); + LOG.debug("Searching for process with query {}", processQFName); + return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() { + @Override + public void assertOnEntity(final Referenceable entity) throws Exception { + List<String> recentQueries = (List<String>) entity.get("recentQueries"); + Assert.assertEquals(recentQueries.get(0), lower(event.getQueryStr())); + } + }); + } catch(Exception e) { + LOG.error("Exception : ", e); + throw e; + } } private String getDSTypeName(Entity entity) { return Entity.Type.TABLE.equals(entity.getType()) ? HiveDataTypes.HIVE_TABLE.name() : FSDataTypes.HDFS_PATH().toString(); } - private SortedMap<Entity, Referenceable> getSortedProcessDataSets(List<Entity> inputTbls) { - SortedMap<Entity, Referenceable> inputs = new TreeMap<Entity, Referenceable>(entityComparator); + private <T extends Entity> SortedMap<T, Referenceable> getSortedProcessDataSets(Set<T> inputTbls) { + SortedMap<T, Referenceable> inputs = new TreeMap<T, Referenceable>(entityComparator); if (inputTbls != null) { - for (final Entity tbl : inputTbls) { + for (final T tbl : inputTbls) { Referenceable inputTableRef = new Referenceable(getDSTypeName(tbl), new HashMap<String, Object>() {{ put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tbl.getName()); }}); @@ -1579,10 +1758,22 @@ public class HiveHookIT { return inputs; } - private void assertProcessIsNotRegistered(String queryStr, HiveOperation op, final List<Entity> inputTbls, final List<Entity> outputTbls) throws Exception { - String processQFName = getProcessQualifiedName(op, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls)); - LOG.debug("Searching for process with query {}", processQFName); - assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName); + private void assertProcessIsNotRegistered(HiveHook.HiveEventContext event) throws Exception { + try { + SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator); + SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator); + if ( event.getInputs() != null) { + sortedHiveInputs.addAll(event.getInputs()); + } + if ( event.getOutputs() != null) { + sortedHiveOutputs.addAll(event.getOutputs()); + } + String processQFName = getProcessQualifiedName(event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs())); + LOG.debug("Searching for process with query {}", processQFName); + assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName); + } catch( Exception e) { + LOG.error("Exception : ", e); + } } private void assertTableIsNotRegistered(String dbName, String tableName, boolean isTemporaryTable) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f51c8861/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java index 8bbe2d7..09b1c4b 100644 --- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -128,7 +128,7 @@ public abstract class AtlasHook { } catch (Exception e) { numRetries++; if (numRetries < maxRetries) { - LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e); + LOG.info("Failed to notify atlas for entity {}. Retrying", message, e); } else { if (shouldLogFailedMessages && e instanceof NotificationException) { List<String> failedMessages = ((NotificationException) e).getFailedMessages(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f51c8861/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 5a6440c..5de6df1 100644 --- a/release-log.txt +++ b/release-log.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES: ALL CHANGES: ATLAS-966 Exit execution of import_hive.sh if HIVE_HOME is not set (svimal2106 via sumasai) +ATLAS-917 Add hdfs paths to process qualified name for non-partition based queries (sumasai) --Release 0.7-incubating
