http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java index 0d163ee..7b881a3 100644 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java @@ -18,32 +18,56 @@ package org.apache.atlas.hive; +import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.hive.bridge.ColumnLineageUtils; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.hook.HiveHookIT; import org.apache.atlas.hive.model.HiveDataTypes; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; import org.apache.atlas.utils.AuthenticationUtil; import org.apache.atlas.utils.ParamChecker; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.hooks.Entity; +import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.BeforeClass; import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; -import static org.apache.atlas.AtlasClient.NAME; -import static org.apache.atlas.hive.hook.HiveHook.lower; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; @@ -51,21 +75,27 @@ import static org.testng.Assert.fail; public class HiveITBase { private static final Logger LOG = LoggerFactory.getLogger(HiveITBase.class); - protected static final String DGI_URL = "http://localhost:21000/"; + public static final String DEFAULT_DB = "default"; + public static final String SEP = ":".intern(); + public static final String IO_SEP = "->".intern(); + protected static final String DGI_URL = "http://localhost:21000/"; protected static final String CLUSTER_NAME = "primary"; - public static final String DEFAULT_DB = "default"; + protected static final String PART_FILE = "2015-01-01"; + protected static final String INPUTS = "inputs";; + protected static final String OUTPUTS = "outputs"; - protected static final String PART_FILE = "2015-01-01"; - protected Driver driver; - protected AtlasClient atlasClient; + + protected Driver driver; + protected AtlasClient atlasClient; + protected AtlasClientV2 atlasClientV2; protected HiveMetaStoreBridge hiveMetaStoreBridge; - protected SessionState ss; + protected SessionState ss; + protected HiveConf conf; + protected Driver driverWithoutContext; - protected HiveConf conf; + private static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName"; + private static final String ATTR_NAME = "name"; - protected static final String INPUTS = AtlasClient.PROCESS_ATTRIBUTE_INPUTS; - protected static final String OUTPUTS = AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS; - protected Driver driverWithoutContext; @BeforeClass public void setUp() throws Exception { @@ -86,12 +116,15 @@ public class HiveITBase { } if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { + atlasClientV2 = new AtlasClientV2(atlasEndPoint, new String[]{"admin", "admin"}); atlasClient = new AtlasClient(atlasEndPoint, new String[]{"admin", "admin"}); } else { + atlasClientV2 = new AtlasClientV2(atlasEndPoint); atlasClient = new AtlasClient(atlasEndPoint); + } - hiveMetaStoreBridge = new HiveMetaStoreBridge(configuration, conf, atlasClient); + hiveMetaStoreBridge = new HiveMetaStoreBridge(configuration, conf, atlasClientV2); HiveConf conf = new HiveConf(); conf.set("hive.exec.post.hooks", ""); @@ -115,7 +148,6 @@ public class HiveITBase { protected void runCommandWithDelay(Driver driver, String cmd, int sleepMs) throws Exception { LOG.debug("Running command '{}'", cmd); - ss.setCommandType(null); CommandProcessorResponse response = driver.run(cmd); assertEquals(response.getResponseCode(), 0); if (sleepMs != 0) { @@ -127,6 +159,13 @@ public class HiveITBase { return "pfile://" + mkdir(path); } + protected String file(String tag) throws Exception { + String filename = System.getProperty("user.dir") + "/target/" + tag + "-data-" + random(); + File file = new File(filename); + file.createNewFile(); + return file.getAbsolutePath(); + } + protected String mkdir(String tag) throws Exception { String filename = "./target/" + tag + "-data-" + random(); File file = new File(filename); @@ -134,6 +173,13 @@ public class HiveITBase { return file.getAbsolutePath(); } + public static String lower(String str) { + if (StringUtils.isEmpty(str)) { + return null; + } + return str.toLowerCase().trim(); + } + protected String random() { return RandomStringUtils.randomAlphanumeric(10); } @@ -149,28 +195,48 @@ public class HiveITBase { protected String assertTableIsRegistered(String dbName, String tableName, HiveHookIT.AssertPredicate assertPredicate, boolean isTemporary) throws Exception { LOG.debug("Searching for table {}.{}", dbName, tableName); String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName, isTemporary); - return assertEntityIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName, + return assertEntityIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName, assertPredicate); } protected String assertEntityIsRegistered(final String typeName, final String property, final String value, - final HiveHookIT.AssertPredicate assertPredicate) throws Exception { - waitFor(1000, new HiveHookIT.Predicate() { + final HiveHookIT.AssertPredicate assertPredicate) throws Exception { + waitFor(80000, new HiveHookIT.Predicate() { @Override public void evaluate() throws Exception { - Referenceable entity = atlasClient.getEntity(typeName, property, value); + AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property,value)); + AtlasEntity entity = atlasEntityWithExtInfo.getEntity(); assertNotNull(entity); if (assertPredicate != null) { assertPredicate.assertOnEntity(entity); } } }); - Referenceable entity = atlasClient.getEntity(typeName, property, value); - return entity.getId()._getId(); + AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property,value)); + AtlasEntity entity = atlasEntityWithExtInfo.getEntity(); + return (String) entity.getGuid(); + } + + protected AtlasEntity assertEntityIsRegistedViaEntity(final String typeName, final String property, final String value, + final HiveHookIT.AssertPredicate assertPredicate) throws Exception { + waitFor(80000, new HiveHookIT.Predicate() { + @Override + public void evaluate() throws Exception { + AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property,value)); + AtlasEntity entity = atlasEntityWithExtInfo.getEntity(); + assertNotNull(entity); + if (assertPredicate != null) { + assertPredicate.assertOnEntity(entity); + } + } + }); + AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(property,value)); + AtlasEntity entity = atlasEntityWithExtInfo.getEntity(); + return entity; } public interface AssertPredicate { - void assertOnEntity(Referenceable entity) throws Exception; + void assertOnEntity(AtlasEntity entity) throws Exception; } public interface Predicate { @@ -209,28 +275,30 @@ public class HiveITBase { protected String getTableProcessQualifiedName(String dbName, String tableName) throws Exception { return HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, - hiveMetaStoreBridge.hiveClient.getTable(dbName, tableName)); + hiveMetaStoreBridge.getHiveClient().getTable(dbName, tableName)); } - protected void validateHDFSPaths(Referenceable processReference, String attributeName, String... testPaths) throws Exception { - List<Id> hdfsPathRefs = (List<Id>) processReference.get(attributeName); + protected void validateHDFSPaths(AtlasEntity processEntity, String attributeName, String... testPaths) throws Exception { + List<AtlasObjectId> hdfsPathIds = toAtlasObjectIdList(processEntity.getAttribute(attributeName)); for (String testPath : testPaths) { - final Path path = new Path(testPath); - final String testPathNormed = lower(path.toString()); - String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed); - Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId); + Path path = new Path(testPath); + String testPathNormed = lower(path.toString()); + String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed); - Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId); - Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed); - Assert.assertEquals(hdfsPathRef.get(NAME), Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); - Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), testPathNormed); + Assert.assertEquals(hdfsPathIds.get(0).getGuid(), hdfsPathId); } } - private String assertHDFSPathIsRegistered(String path) throws Exception { + protected String assertHDFSPathIsRegistered(String path) throws Exception { LOG.debug("Searching for hdfs path {}", path); - return assertEntityIsRegistered(HiveMetaStoreBridge.HDFS_PATH, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, path, null); + // ATLAS-2444 HDFS name node federation adds the cluster name to the qualifiedName + if (path.startsWith("hdfs://")) { + String pathWithCluster = path + "@" + CLUSTER_NAME; + return assertEntityIsRegistered(HiveMetaStoreBridge.HDFS_PATH, REFERENCEABLE_ATTRIBUTE_NAME, pathWithCluster, null); + } else { + return assertEntityIsRegistered(HiveMetaStoreBridge.HDFS_PATH, REFERENCEABLE_ATTRIBUTE_NAME, path, null); + } } protected String assertDatabaseIsRegistered(String dbName) throws Exception { @@ -240,7 +308,419 @@ public class HiveITBase { protected String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception { LOG.debug("Searching for database {}", dbName); String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName); - return assertEntityIsRegistered(HiveDataTypes.HIVE_DB.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + return assertEntityIsRegistered(HiveDataTypes.HIVE_DB.getName(), REFERENCEABLE_ATTRIBUTE_NAME, dbQualifiedName, assertPredicate); } -} + + + protected AtlasEntity getAtlasEntityByType(String type, String id) throws Exception { + AtlasEntity atlasEntity = null; + AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfoForProcess = atlasClientV2.getEntityByAttribute(type, + Collections.singletonMap(AtlasClient.GUID, id)); + atlasEntity = atlasEntityWithExtInfoForProcess.getEntity(); + return atlasEntity; + } + + + public static class HiveEventContext { + private Set<ReadEntity> inputs; + private Set<WriteEntity> outputs; + + private String user; + private UserGroupInformation ugi; + private HiveOperation operation; + private HookContext.HookType hookType; + private JSONObject jsonPlan; + private String queryId; + private String queryStr; + private Long queryStartTime; + + public Map<String, List<ColumnLineageUtils.HiveColumnLineageInfo>> lineageInfo; + + private List<HookNotificationMessage> messages = new ArrayList<>(); + + public void setInputs(Set<ReadEntity> inputs) { + this.inputs = inputs; + } + + public void setOutputs(Set<WriteEntity> outputs) { + this.outputs = outputs; + } + + public void setUser(String user) { + this.user = user; + } + + public void setUgi(UserGroupInformation ugi) { + this.ugi = ugi; + } + + public void setOperation(HiveOperation operation) { + this.operation = operation; + } + + public void setHookType(HookContext.HookType hookType) { + this.hookType = hookType; + } + + public void setQueryId(String queryId) { + this.queryId = queryId; + } + + public void setQueryStr(String queryStr) { + this.queryStr = queryStr; + } + + public void setQueryStartTime(Long queryStartTime) { + this.queryStartTime = queryStartTime; + } + + public void setLineageInfo(LineageInfo lineageInfo){ + try { + this.lineageInfo = ColumnLineageUtils.buildLineageMap(lineageInfo); + LOG.debug("Column Lineage Map => {} ", this.lineageInfo.entrySet()); + }catch (Throwable e){ + LOG.warn("Column Lineage Map build failed with exception {}", e); + } + } + + public Set<ReadEntity> getInputs() { + return inputs; + } + + public Set<WriteEntity> getOutputs() { + return outputs; + } + + public String getUser() { + return user; + } + + public UserGroupInformation getUgi() { + return ugi; + } + + public HiveOperation getOperation() { + return operation; + } + + public HookContext.HookType getHookType() { + return hookType; + } + + public String getQueryId() { + return queryId; + } + + public String getQueryStr() { + return queryStr; + } + + public Long getQueryStartTime() { + return queryStartTime; + } + + public void addMessage(HookNotificationMessage message) { + messages.add(message); + } + + public List<HookNotificationMessage> getMessages() { + return messages; + } + } + + + @VisibleForTesting + protected static String getProcessQualifiedName(HiveMetaStoreBridge dgiBridge, HiveEventContext eventContext, + final SortedSet<ReadEntity> sortedHiveInputs, + final SortedSet<WriteEntity> sortedHiveOutputs, + SortedMap<ReadEntity, AtlasEntity> hiveInputsMap, + SortedMap<WriteEntity, AtlasEntity> hiveOutputsMap) throws HiveException { + HiveOperation op = eventContext.getOperation(); + if (isCreateOp(eventContext)) { + Entity entity = getEntityByType(sortedHiveOutputs, Entity.Type.TABLE); + + if (entity != null) { + Table outTable = entity.getTable(); + //refresh table + outTable = dgiBridge.getHiveClient().getTable(outTable.getDbName(), outTable.getTableName()); + return HiveMetaStoreBridge.getTableProcessQualifiedName(dgiBridge.getClusterName(), outTable); + } + } + + StringBuilder buffer = new StringBuilder(op.getOperationName()); + + boolean ignoreHDFSPathsinQFName = ignoreHDFSPathsinQFName(op, sortedHiveInputs, sortedHiveOutputs); + if ( ignoreHDFSPathsinQFName && LOG.isDebugEnabled()) { + LOG.debug("Ignoring HDFS paths in qualifiedName for {} {} ", op, eventContext.getQueryStr()); + } + + addInputs(dgiBridge, op, sortedHiveInputs, buffer, hiveInputsMap, ignoreHDFSPathsinQFName); + buffer.append(IO_SEP); + addOutputs(dgiBridge, op, sortedHiveOutputs, buffer, hiveOutputsMap, ignoreHDFSPathsinQFName); + LOG.info("Setting process qualified name to {}", buffer); + return buffer.toString(); + } + + + protected static Entity getEntityByType(Set<? extends Entity> entities, Entity.Type entityType) { + for (Entity entity : entities) { + if (entity.getType() == entityType) { + return entity; + } + } + return null; + } + + + protected static boolean ignoreHDFSPathsinQFName(final HiveOperation op, final Set<ReadEntity> inputs, final Set<WriteEntity> outputs) { + switch (op) { + case LOAD: + case IMPORT: + return isPartitionBasedQuery(outputs); + case EXPORT: + return isPartitionBasedQuery(inputs); + case QUERY: + return true; + } + return false; + } + + protected static boolean isPartitionBasedQuery(Set<? extends Entity> entities) { + for (Entity entity : entities) { + if (Entity.Type.PARTITION.equals(entity.getType())) { + return true; + } + } + return false; + } + + protected static boolean isCreateOp(HiveEventContext hiveEvent) { + return HiveOperation.CREATETABLE.equals(hiveEvent.getOperation()) + || HiveOperation.CREATEVIEW.equals(hiveEvent.getOperation()) + || HiveOperation.ALTERVIEW_AS.equals(hiveEvent.getOperation()) + || HiveOperation.ALTERTABLE_LOCATION.equals(hiveEvent.getOperation()) + || HiveOperation.CREATETABLE_AS_SELECT.equals(hiveEvent.getOperation()); + } + + protected static void addInputs(HiveMetaStoreBridge hiveBridge, HiveOperation op, SortedSet<ReadEntity> sortedInputs, StringBuilder buffer, final Map<ReadEntity, AtlasEntity> refs, final boolean ignoreHDFSPathsInQFName) throws HiveException { + if (refs != null) { + if (sortedInputs != null) { + Set<String> dataSetsProcessed = new LinkedHashSet<>(); + for (Entity input : sortedInputs) { + + if (!dataSetsProcessed.contains(input.getName().toLowerCase())) { + //HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations + if (ignoreHDFSPathsInQFName && + (Entity.Type.DFS_DIR.equals(input.getType()) || Entity.Type.LOCAL_DIR.equals(input.getType()))) { + LOG.debug("Skipping dfs dir input addition to process qualified name {} ", input.getName()); + } else if (refs.containsKey(input)) { + if ( input.getType() == Entity.Type.PARTITION || input.getType() == Entity.Type.TABLE) { + Table inputTable = refreshTable(hiveBridge, input.getTable().getDbName(), input.getTable().getTableName()); + + if (inputTable != null) { + addDataset(buffer, refs.get(input), HiveMetaStoreBridge.getTableCreatedTime(inputTable)); + } + } else { + addDataset(buffer, refs.get(input)); + } + } + + dataSetsProcessed.add(input.getName().toLowerCase()); + } + } + + } + } + } + + protected static void addDataset(StringBuilder buffer, AtlasEntity ref, final long createTime) { + addDataset(buffer, ref); + buffer.append(SEP); + buffer.append(createTime); + } + + protected static void addDataset(StringBuilder buffer, AtlasEntity ref) { + buffer.append(SEP); + String dataSetQlfdName = (String) ref.getAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); + // '/' breaks query parsing on ATLAS + buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", "")); + } + + protected static void addOutputs(HiveMetaStoreBridge hiveBridge, HiveOperation op, SortedSet<WriteEntity> sortedOutputs, StringBuilder buffer, final Map<WriteEntity, AtlasEntity> refs, final boolean ignoreHDFSPathsInQFName) throws HiveException { + if (refs != null) { + Set<String> dataSetsProcessed = new LinkedHashSet<>(); + if (sortedOutputs != null) { + for (WriteEntity output : sortedOutputs) { + final Entity entity = output; + if (!dataSetsProcessed.contains(output.getName().toLowerCase())) { + if (ignoreHDFSPathsInQFName && + (Entity.Type.DFS_DIR.equals(output.getType()) || Entity.Type.LOCAL_DIR.equals(output.getType()))) { + LOG.debug("Skipping dfs dir output addition to process qualified name {} ", output.getName()); + } else if (refs.containsKey(output)) { + //HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations + if (addQueryType(op, (WriteEntity) entity)) { + buffer.append(SEP); + buffer.append(((WriteEntity) entity).getWriteType().name()); + } + + if (output.getType() == Entity.Type.PARTITION || output.getType() == Entity.Type.TABLE) { + Table outputTable = refreshTable(hiveBridge, output.getTable().getDbName(), output.getTable().getTableName()); + + if (outputTable != null) { + addDataset(buffer, refs.get(output), HiveMetaStoreBridge.getTableCreatedTime(outputTable)); + } + } else { + addDataset(buffer, refs.get(output)); + } + } + + dataSetsProcessed.add(output.getName().toLowerCase()); + } + } + } + } + } + + protected static Table refreshTable(HiveMetaStoreBridge dgiBridge, String dbName, String tableName) { + try { + return dgiBridge.getHiveClient().getTable(dbName, tableName); + } catch (HiveException excp) { // this might be the case for temp tables + LOG.warn("failed to get details for table {}.{}. Ignoring. {}: {}", dbName, tableName, excp.getClass().getCanonicalName(), excp.getMessage()); + } + + return null; + } + + protected static boolean addQueryType(HiveOperation op, WriteEntity entity) { + if (entity.getWriteType() != null && HiveOperation.QUERY.equals(op)) { + switch (entity.getWriteType()) { + case INSERT: + case INSERT_OVERWRITE: + case UPDATE: + case DELETE: + return true; + case PATH_WRITE: + //Add query type only for DFS paths and ignore local paths since they are not added as outputs + if ( !Entity.Type.LOCAL_DIR.equals(entity.getType())) { + return true; + } + break; + default: + } + } + return false; + } + + + @VisibleForTesting + protected static final class EntityComparator implements Comparator<Entity> { + @Override + public int compare(Entity o1, Entity o2) { + String s1 = o1.getName(); + String s2 = o2.getName(); + if (s1 == null || s2 == null){ + s1 = o1.getD().toString(); + s2 = o2.getD().toString(); + } + return s1.toLowerCase().compareTo(s2.toLowerCase()); + } + } + + @VisibleForTesting + protected static final Comparator<Entity> entityComparator = new EntityComparator(); + + protected AtlasObjectId toAtlasObjectId(Object obj) { + final AtlasObjectId ret; + + if (obj instanceof AtlasObjectId) { + ret = (AtlasObjectId) obj; + } else if (obj instanceof Map) { + ret = new AtlasObjectId((Map) obj); + } else if (obj != null) { + ret = new AtlasObjectId(obj.toString()); // guid + } else { + ret = null; + } + + return ret; + } + + protected List<AtlasObjectId> toAtlasObjectIdList(Object obj) { + final List<AtlasObjectId> ret; + + if (obj instanceof Collection) { + Collection coll = (Collection) obj; + + ret = new ArrayList<>(coll.size()); + + for (Object item : coll) { + AtlasObjectId objId = toAtlasObjectId(item); + + if (objId != null) { + ret.add(objId); + } + } + } else { + AtlasObjectId objId = toAtlasObjectId(obj); + + if (objId != null) { + ret = new ArrayList<>(1); + + ret.add(objId); + } else { + ret = null; + } + } + + return ret; + } + + + protected AtlasStruct toAtlasStruct(Object obj) { + final AtlasStruct ret; + + if (obj instanceof AtlasStruct) { + ret = (AtlasStruct) obj; + } else if (obj instanceof Map) { + Map map = (Map) obj; + Object typeName = map.get("typeName"); + Map attributes = (map.get("attributes") instanceof Map) ? (Map) map.get("attributes") : map; + + ret = new AtlasStruct(typeName == null ? "" : typeName.toString(), attributes); + } else { + ret = null; + } + + return ret; + } + + protected List<AtlasStruct> toAtlasStructList(Object obj) { + final List<AtlasStruct> ret; + + if (obj instanceof Collection) { + Collection coll = (Collection) obj; + + ret = new ArrayList<>(coll.size()); + + for (Object item : coll) { + AtlasStruct struct = toAtlasStruct(item); + + if (struct != null) { + ret.add(struct); + } + } + } else { + AtlasStruct struct = toAtlasStruct(obj); + + if (struct != null) { + ret = new ArrayList<>(1); + + ret.add(struct); + } else { + ret = null; + } + } + + return ret; + }}
http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java new file mode 100644 index 0000000..dc14480 --- /dev/null +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hive.bridge; + +import org.apache.atlas.AtlasClient; +import org.apache.atlas.hive.hook.events.BaseHiveEvent; +import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ColumnLineageUtils { + public static final Logger LOG = LoggerFactory.getLogger(ColumnLineageUtils.class); + public static class HiveColumnLineageInfo { + public final String depenendencyType; + public final String expr; + public final String inputColumn; + + HiveColumnLineageInfo(LineageInfo.Dependency d, String inputCol) { + depenendencyType = d.getType().name(); + expr = d.getExpr(); + inputColumn = inputCol; + } + + @Override + public String toString(){ + return inputColumn; + } + } + + public static String getQualifiedName(LineageInfo.DependencyKey key){ + String db = key.getDataContainer().getTable().getDbName(); + String table = key.getDataContainer().getTable().getTableName(); + String col = key.getFieldSchema().getName(); + return db + "." + table + "." + col; + } + + public static Map<String, List<HiveColumnLineageInfo>> buildLineageMap(LineageInfo lInfo) { + Map<String, List<HiveColumnLineageInfo>> m = new HashMap<>(); + + for (Map.Entry<LineageInfo.DependencyKey, LineageInfo.Dependency> e : lInfo.entrySet()) { + List<HiveColumnLineageInfo> l = new ArrayList<>(); + String k = getQualifiedName(e.getKey()); + + if (LOG.isDebugEnabled()) { + LOG.debug("buildLineageMap(): key={}; value={}", e.getKey(), e.getValue()); + } + + Collection<LineageInfo.BaseColumnInfo> baseCols = getBaseCols(e.getValue()); + + if (baseCols != null) { + for (LineageInfo.BaseColumnInfo iCol : baseCols) { + String db = iCol.getTabAlias().getTable().getDbName(); + String table = iCol.getTabAlias().getTable().getTableName(); + String colQualifiedName = iCol.getColumn() == null ? db + "." + table : db + "." + table + "." + iCol.getColumn().getName(); + l.add(new HiveColumnLineageInfo(e.getValue(), colQualifiedName)); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Setting lineage --> Input: {} ==> Output : {}", l, k); + } + m.put(k, l); + } + } + return m; + } + + static Collection<LineageInfo.BaseColumnInfo> getBaseCols(LineageInfo.Dependency lInfoDep) { + Collection<LineageInfo.BaseColumnInfo> ret = null; + + if (lInfoDep != null) { + try { + Method getBaseColsMethod = lInfoDep.getClass().getMethod("getBaseCols"); + + Object retGetBaseCols = getBaseColsMethod.invoke(lInfoDep); + + if (retGetBaseCols != null) { + if (retGetBaseCols instanceof Collection) { + ret = (Collection) retGetBaseCols; + } else { + LOG.warn("{}: unexpected return type from LineageInfo.Dependency.getBaseCols(), expected type {}", + retGetBaseCols.getClass().getName(), "Collection"); + } + } + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) { + LOG.warn("getBaseCols()", ex); + } + } + + return ret; + } + + static String[] extractComponents(String qualifiedName) { + String[] comps = qualifiedName.split("\\."); + int lastIdx = comps.length - 1; + int atLoc = comps[lastIdx].indexOf('@'); + if (atLoc > 0) { + comps[lastIdx] = comps[lastIdx].substring(0, atLoc); + } + return comps; + } + + static void populateColumnReferenceableMap(Map<String, Referenceable> m, + Referenceable r) { + if (r.getTypeName().equals(HiveDataTypes.HIVE_TABLE.getName())) { + String qName = (String) r.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); + String[] qNameComps = extractComponents(qName); + for (Referenceable col : (List<Referenceable>) r.get(BaseHiveEvent.ATTRIBUTE_COLUMNS)) { + String cName = (String) col.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); + String[] colQNameComps = extractComponents(cName); + String colQName = colQNameComps[0] + "." + colQNameComps[1] + "." + colQNameComps[2]; + m.put(colQName, col); + } + String tableQName = qNameComps[0] + "." + qNameComps[1]; + m.put(tableQName, r); + } + } + + + public static Map<String, Referenceable> buildColumnReferenceableMap(List<Referenceable> inputs, + List<Referenceable> outputs) { + Map<String, Referenceable> m = new HashMap<>(); + + for (Referenceable r : inputs) { + populateColumnReferenceableMap(m, r); + } + + for (Referenceable r : outputs) { + populateColumnReferenceableMap(m, r); + } + + return m; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java deleted file mode 100644 index f4abfb6..0000000 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.hive.bridge; - -import org.apache.atlas.hive.hook.HiveHook; -import org.apache.atlas.hive.rewrite.HiveASTRewriter; -import org.apache.atlas.hive.rewrite.RewriteException; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -@Test(enabled = false) -public class HiveLiteralRewriterTest { - - private HiveConf conf; - - @BeforeClass(enabled = false) - public void setup() { - conf = new HiveConf(); - conf.addResource("/hive-site.xml"); - SessionState ss = new SessionState(conf, "testuser"); - SessionState.start(ss); - conf.set("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"); - } - - @Test(enabled=false) - public void testLiteralRewrite() throws RewriteException { - HiveHook.HiveEventContext ctx = new HiveHook.HiveEventContext(); - ctx.setQueryStr("insert into table testTable partition(dt='2014-01-01') select * from test1 where dt = '2014-01-01'" + - " and intColumn = 10" + - " and decimalColumn = 1.10" + - " and charColumn = 'a'" + - " and hexColumn = unhex('\\0xFF')" + - " and expColumn = cast('-1.5e2' as int)" + - " and boolCol = true"); - - HiveASTRewriter queryRewriter = new HiveASTRewriter(conf); - String result = queryRewriter.rewrite(ctx.getQueryStr()); - System.out.println("normlized sql : " + result); - - final String normalizedSQL = "insert into table testTable partition(dt='STRING_LITERAL') " + - "select * from test1 where dt = 'STRING_LITERAL' " + - "and intColumn = NUMBER_LITERAL " + - "and decimalColumn = NUMBER_LITERAL and " + - "charColumn = 'STRING_LITERAL' and " + - "hexColumn = unhex('STRING_LITERAL') and " + - "expColumn = cast('STRING_LITERAL' as int) and " + - "boolCol = BOOLEAN_LITERAL"; - Assert.assertEquals(result, normalizedSQL); - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java index 0256cf3..d42182e 100644 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java @@ -19,9 +19,11 @@ package org.apache.atlas.hive.bridge; import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasClientV2; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.hive.model.HiveDataTypes; -import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; @@ -31,20 +33,21 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.mapred.TextInputFormat; -import org.codehaus.jettison.json.JSONException; import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import scala.actors.threadpool.Arrays; +import java.util.Arrays; import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import static org.mockito.Mockito.argThat; -import static org.mockito.Mockito.eq; +import static org.apache.atlas.hive.hook.events.BaseHiveEvent.*; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -61,6 +64,18 @@ public class HiveMetaStoreBridgeTest { @Mock private AtlasClient atlasClient; + @Mock + private AtlasClientV2 atlasClientV2; + + @Mock + private AtlasEntity atlasEntity; + + @Mock + private AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo; + + @Mock + EntityMutationResponse entityMutationResponse; + @BeforeMethod public void initializeMocks() { MockitoAnnotations.initMocks(this); @@ -71,19 +86,21 @@ public class HiveMetaStoreBridgeTest { // setup database when(hiveClient.getAllDatabases()).thenReturn(Arrays.asList(new String[]{TEST_DB_NAME})); String description = "This is a default database"; - when(hiveClient.getDatabase(TEST_DB_NAME)).thenReturn( - new Database(TEST_DB_NAME, description, "/user/hive/default", null)); + Database db = new Database(TEST_DB_NAME, description, "/user/hive/default", null); + when(hiveClient.getDatabase(TEST_DB_NAME)).thenReturn(db); when(hiveClient.getAllTables(TEST_DB_NAME)).thenReturn(Arrays.asList(new String[]{})); - returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); + returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME); - HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); + when(atlasEntityWithExtInfo.getEntity("72e06b34-9151-4023-aa9d-b82103a50e76")) + .thenReturn((new AtlasEntity.AtlasEntityWithExtInfo( + getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76"))).getEntity()); + + HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); bridge.importHiveMetadata(true); // verify update is called - verify(atlasClient).updateEntity(eq("72e06b34-9151-4023-aa9d-b82103a50e76"), - (Referenceable) argThat( - new MatchesReferenceableProperty(HiveMetaStoreBridge.DESCRIPTION_ATTR, description))); + verify(atlasClientV2).updateEntity((AtlasEntity.AtlasEntityWithExtInfo)anyObject()); } @Test @@ -92,32 +109,50 @@ public class HiveMetaStoreBridgeTest { List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME); - returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); + returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME); // return existing table - when(atlasClient.getEntity(HiveDataTypes.HIVE_TABLE.getName(), - AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME))) - .thenReturn(getEntityReference(HiveDataTypes.HIVE_TABLE.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77")); - when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference()); + + when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")) + .thenReturn((new AtlasEntity.AtlasEntityWithExtInfo( + getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))).getEntity()); + + when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(), + Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME)))) + .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo( + getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))); + + when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")) + .thenReturn(createTableReference()); + String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(0)); - when(atlasClient.getEntity(HiveDataTypes.HIVE_PROCESS.getName(), - AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQualifiedName)).thenReturn(getEntityReference(HiveDataTypes.HIVE_PROCESS.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77")); - HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); + when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(), + Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + processQualifiedName))) + .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo( + getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))); + + + HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); bridge.importHiveMetadata(true); // verify update is called on table - verify(atlasClient).updateEntity(eq("82e06b34-9151-4023-aa9d-b82103a50e77"), - (Referenceable) argThat(new MatchesReferenceableProperty(HiveMetaStoreBridge.TABLE_TYPE_ATTR, - TableType.EXTERNAL_TABLE.name()))); + verify(atlasClientV2, times(2)).updateEntity((AtlasEntity.AtlasEntityWithExtInfo)anyObject()); + } - private void returnExistingDatabase(String databaseName, AtlasClient atlasClient, String clusterName) - throws AtlasServiceException, JSONException { - when(atlasClient.getEntity( - HiveDataTypes.HIVE_DB.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getDBQualifiedName(clusterName, databaseName))).thenReturn( - getEntityReference(HiveDataTypes.HIVE_DB.getName(), "72e06b34-9151-4023-aa9d-b82103a50e76")); + private void returnExistingDatabase(String databaseName, AtlasClientV2 atlasClientV2, String clusterName) + throws AtlasServiceException { + //getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76"); + + when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_DB.getName(), + Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, TEST_DB_NAME)))) + .thenReturn((new AtlasEntity.AtlasEntityWithExtInfo( + getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76")))); + } private List<Table> setupTables(Hive hiveClient, String databaseName, String... tableNames) throws HiveException { @@ -143,15 +178,25 @@ public class HiveMetaStoreBridgeTest { List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME); Table hiveTable = hiveTables.get(0); - returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); + returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME); + + + when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(), + Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME)))) + .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo( + getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))); - when(atlasClient.getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME))).thenReturn( - getEntityReference(HiveDataTypes.HIVE_TABLE.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77")); String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTable); - when(atlasClient.getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - processQualifiedName)).thenReturn(getEntityReference(HiveDataTypes.HIVE_PROCESS.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77")); - when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference()); + + when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(), + Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + processQualifiedName))) + .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo( + getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))); + + when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")) + .thenReturn(createTableReference()); Partition partition = mock(Partition.class); when(partition.getTable()).thenReturn(hiveTable); @@ -160,7 +205,7 @@ public class HiveMetaStoreBridgeTest { when(hiveClient.getPartitions(hiveTable)).thenReturn(Arrays.asList(new Partition[]{partition})); - HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); + HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); try { bridge.importHiveMetadata(true); } catch (Exception e) { @@ -174,18 +219,27 @@ public class HiveMetaStoreBridgeTest { final String table2Name = TEST_TABLE_NAME + "_1"; List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME, table2Name); - returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); + returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME); when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore")); - when(atlasClient.getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, - table2Name))).thenReturn( - getEntityReference(HiveDataTypes.HIVE_TABLE.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77")); - when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference()); + when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(), + Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME)))) + .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo( + getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))); + + when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")) + .thenReturn(createTableReference()); + String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(1)); - when(atlasClient.getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - processQualifiedName)).thenReturn(getEntityReference(HiveDataTypes.HIVE_PROCESS.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77")); - HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); + when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(), + Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + processQualifiedName))) + .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo( + getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))); + + HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); try { bridge.importHiveMetadata(false); } catch (Exception e) { @@ -199,18 +253,29 @@ public class HiveMetaStoreBridgeTest { final String table2Name = TEST_TABLE_NAME + "_1"; List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME, table2Name); - returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); + returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME); when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore")); - when(atlasClient.getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, - table2Name))).thenReturn( - getEntityReference(HiveDataTypes.HIVE_TABLE.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77")); - when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference()); - String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(1)); - when(atlasClient.getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - processQualifiedName)).thenReturn(getEntityReference(HiveDataTypes.HIVE_PROCESS.getName(), "82e06b34-9151-4023-aa9d-b82103a50e77")); - HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); + when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(), + Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME)))) + .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo( + getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))); + + + when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")) + .thenReturn(createTableReference()); + + String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(1)); + + when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(), + Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + processQualifiedName))) + .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo( + getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))); + + HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); try { bridge.importHiveMetadata(true); Assert.fail("Table registration is supposed to fail"); @@ -219,15 +284,15 @@ public class HiveMetaStoreBridgeTest { } } - private Referenceable getEntityReference(String typeName, String id) throws JSONException { - return new Referenceable(id, typeName, null); + private AtlasEntity getEntity(String typeName, String attr, String value) { + return new AtlasEntity(typeName, attr, value); } - private Referenceable createTableReference() { - Referenceable tableReference = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); - Referenceable sdReference = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName()); - tableReference.set(HiveMetaStoreBridge.STORAGE_DESC, sdReference); - return tableReference; + private AtlasEntity createTableReference() { + AtlasEntity tableEntity = new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName()); + AtlasEntity sdEntity = new AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName()); + tableEntity.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sdEntity)); + return tableEntity; } private Table createTestTable(String databaseName, String tableName) throws HiveException { @@ -253,7 +318,7 @@ public class HiveMetaStoreBridgeTest { @Override public boolean matches(Object o) { - return attrValue.equals(((Referenceable) o).get(attrName)); + return attrValue.equals(((AtlasEntity) o).getAttribute(attrName)); } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetastoreBridgeIT.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetastoreBridgeIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetastoreBridgeIT.java index d09db1b..a5b1f4d 100644 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetastoreBridgeIT.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetastoreBridgeIT.java @@ -21,8 +21,8 @@ package org.apache.atlas.hive.bridge; import org.apache.atlas.AtlasClient; import org.apache.atlas.hive.HiveITBase; import org.apache.atlas.hive.model.HiveDataTypes; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; import org.testng.annotations.Test; import java.util.List; @@ -34,34 +34,36 @@ public class HiveMetastoreBridgeIT extends HiveITBase { @Test public void testCreateTableAndImport() throws Exception { String tableName = tableName(); + String pFile = createTestDFSPath("parentPath"); + String query = String.format("create EXTERNAL table %s(id string, cnt int) location '%s'", tableName, pFile); - String pFile = createTestDFSPath("parentPath"); - final String query = String.format("create EXTERNAL table %s(id string, cnt int) location '%s'", tableName, pFile); runCommand(query); - String dbId = assertDatabaseIsRegistered(DEFAULT_DB); + + String dbId = assertDatabaseIsRegistered(DEFAULT_DB); String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); //verify lineage is created - String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), - AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - getTableProcessQualifiedName(DEFAULT_DB, tableName), null); - Referenceable processReference = atlasClient.getEntity(processId); - validateHDFSPaths(processReference, INPUTS, pFile); + String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getTableProcessQualifiedName(DEFAULT_DB, tableName), null); + AtlasEntity processsEntity = atlasClientV2.getEntityByGuid(processId).getEntity(); + + validateHDFSPaths(processsEntity, INPUTS, pFile); + + List<AtlasObjectId> outputs = toAtlasObjectIdList(processsEntity.getAttribute(OUTPUTS)); - List<Id> outputs = (List<Id>) processReference.get(OUTPUTS); assertEquals(outputs.size(), 1); - assertEquals(outputs.get(0).getId()._getId(), tableId); + assertEquals(outputs.get(0).getGuid(), tableId); int tableCount = atlasClient.listEntities(HiveDataTypes.HIVE_TABLE.getName()).size(); //Now import using import tool - should be no-op. This also tests update since table exists - hiveMetaStoreBridge.importTable(atlasClient.getEntity(dbId), DEFAULT_DB, tableName, true); + AtlasEntity dbEntity = atlasClientV2.getEntityByGuid(dbId).getEntity(); + + hiveMetaStoreBridge.importTable(dbEntity, DEFAULT_DB, tableName, true); + String tableId2 = assertTableIsRegistered(DEFAULT_DB, tableName); assertEquals(tableId2, tableId); - String processId2 = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), - AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - getTableProcessQualifiedName(DEFAULT_DB, tableName), null); + String processId2 = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getTableProcessQualifiedName(DEFAULT_DB, tableName), null); assertEquals(processId2, processId); //assert that table is de-duped and no new entity is created @@ -72,18 +74,23 @@ public class HiveMetastoreBridgeIT extends HiveITBase { @Test public void testImportCreatedTable() throws Exception { String tableName = tableName(); - String pFile = createTestDFSPath("parentPath"); + String pFile = createTestDFSPath("parentPath"); + runCommand(driverWithoutContext, String.format("create EXTERNAL table %s(id string) location '%s'", tableName, pFile)); + String dbId = assertDatabaseIsRegistered(DEFAULT_DB); - hiveMetaStoreBridge.importTable(atlasClient.getEntity(dbId), DEFAULT_DB, tableName, true); + AtlasEntity dbEntity = atlasClientV2.getEntityByGuid(dbId).getEntity(); + + hiveMetaStoreBridge.importTable(dbEntity, DEFAULT_DB, tableName, true); + String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); - String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), - AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - getTableProcessQualifiedName(DEFAULT_DB, tableName), null); - List<Id> outputs = (List<Id>) atlasClient.getEntity(processId).get(OUTPUTS); + String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getTableProcessQualifiedName(DEFAULT_DB, tableName), null); + AtlasEntity processEntity = atlasClientV2.getEntityByGuid(processId).getEntity(); + List<AtlasObjectId> outputs = toAtlasObjectIdList(processEntity.getAttribute(OUTPUTS)); + assertEquals(outputs.size(), 1); - assertEquals(outputs.get(0).getId()._getId(), tableId); + assertEquals(outputs.get(0).getGuid(), tableId); } }
