http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java 
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java
index 0d163ee..7b881a3 100644
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java
@@ -18,32 +18,56 @@
 
 package org.apache.atlas.hive;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.hive.bridge.ColumnLineageUtils;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.hook.HiveHookIT;
 import org.apache.atlas.hive.model.HiveDataTypes;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasStruct;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
 import org.apache.atlas.utils.AuthenticationUtil;
 import org.apache.atlas.utils.ParamChecker;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.hooks.Entity;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 
 import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
 
-import static org.apache.atlas.AtlasClient.NAME;
-import static org.apache.atlas.hive.hook.HiveHook.lower;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.fail;
@@ -51,21 +75,27 @@ import static org.testng.Assert.fail;
 public class HiveITBase {
     private static final Logger LOG = 
LoggerFactory.getLogger(HiveITBase.class);
 
-    protected static final String DGI_URL = "http://localhost:21000/";;
+    public    static final String DEFAULT_DB   = "default";
+    public    static final String SEP          = ":".intern();
+    public    static final String IO_SEP       = "->".intern();
+    protected static final String DGI_URL      = "http://localhost:21000/";;
     protected static final String CLUSTER_NAME = "primary";
-    public static final String DEFAULT_DB = "default";
+    protected static final String PART_FILE    = "2015-01-01";
+    protected static final String INPUTS       = "inputs";;
+    protected static final String OUTPUTS      = "outputs";
 
-    protected static final String PART_FILE = "2015-01-01";
-    protected Driver driver;
-    protected AtlasClient atlasClient;
+
+    protected Driver              driver;
+    protected AtlasClient         atlasClient;
+    protected AtlasClientV2       atlasClientV2;
     protected HiveMetaStoreBridge hiveMetaStoreBridge;
-    protected SessionState ss;
+    protected SessionState        ss;
+    protected HiveConf            conf;
+    protected Driver              driverWithoutContext;
 
-    protected HiveConf conf;
+    private static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName";
+    private static final String ATTR_NAME                    = "name";
 
-    protected static final String INPUTS = 
AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
-    protected static final String OUTPUTS = 
AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
-    protected Driver driverWithoutContext;
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -86,12 +116,15 @@ public class HiveITBase {
         }
 
         if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
+            atlasClientV2 = new AtlasClientV2(atlasEndPoint, new 
String[]{"admin", "admin"});
             atlasClient = new AtlasClient(atlasEndPoint, new String[]{"admin", 
"admin"});
         } else {
+            atlasClientV2 = new AtlasClientV2(atlasEndPoint);
             atlasClient = new AtlasClient(atlasEndPoint);
+
         }
 
-        hiveMetaStoreBridge = new HiveMetaStoreBridge(configuration, conf, 
atlasClient);
+        hiveMetaStoreBridge = new HiveMetaStoreBridge(configuration, conf, 
atlasClientV2);
 
         HiveConf conf = new HiveConf();
         conf.set("hive.exec.post.hooks", "");
@@ -115,7 +148,6 @@ public class HiveITBase {
 
     protected void runCommandWithDelay(Driver driver, String cmd, int sleepMs) 
throws Exception {
         LOG.debug("Running command '{}'", cmd);
-        ss.setCommandType(null);
         CommandProcessorResponse response = driver.run(cmd);
         assertEquals(response.getResponseCode(), 0);
         if (sleepMs != 0) {
@@ -127,6 +159,13 @@ public class HiveITBase {
         return "pfile://" + mkdir(path);
     }
 
+    protected String file(String tag) throws Exception {
+        String filename = System.getProperty("user.dir") + "/target/" + tag + 
"-data-" + random();
+        File file = new File(filename);
+        file.createNewFile();
+        return file.getAbsolutePath();
+    }
+
     protected String mkdir(String tag) throws Exception {
         String filename = "./target/" + tag + "-data-" + random();
         File file = new File(filename);
@@ -134,6 +173,13 @@ public class HiveITBase {
         return file.getAbsolutePath();
     }
 
+    public static String lower(String str) {
+        if (StringUtils.isEmpty(str)) {
+            return null;
+        }
+        return str.toLowerCase().trim();
+    }
+
     protected String random() {
         return RandomStringUtils.randomAlphanumeric(10);
     }
@@ -149,28 +195,48 @@ public class HiveITBase {
     protected String assertTableIsRegistered(String dbName, String tableName, 
HiveHookIT.AssertPredicate assertPredicate, boolean isTemporary) throws 
Exception {
         LOG.debug("Searching for table {}.{}", dbName, tableName);
         String tableQualifiedName = 
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName, 
isTemporary);
-        return assertEntityIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName,
+        return assertEntityIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), 
REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName,
                 assertPredicate);
     }
 
     protected String assertEntityIsRegistered(final String typeName, final 
String property, final String value,
-                                            final HiveHookIT.AssertPredicate 
assertPredicate) throws Exception {
-        waitFor(1000, new HiveHookIT.Predicate() {
+                                              final HiveHookIT.AssertPredicate 
assertPredicate) throws Exception {
+        waitFor(80000, new HiveHookIT.Predicate() {
             @Override
             public void evaluate() throws Exception {
-                Referenceable entity = atlasClient.getEntity(typeName, 
property, value);
+                AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = 
atlasClientV2.getEntityByAttribute(typeName, 
Collections.singletonMap(property,value));
+                AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
                 assertNotNull(entity);
                 if (assertPredicate != null) {
                     assertPredicate.assertOnEntity(entity);
                 }
             }
         });
-        Referenceable entity = atlasClient.getEntity(typeName, property, 
value);
-        return entity.getId()._getId();
+        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = 
atlasClientV2.getEntityByAttribute(typeName, 
Collections.singletonMap(property,value));
+        AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
+        return (String) entity.getGuid();
+    }
+
+    protected AtlasEntity assertEntityIsRegistedViaEntity(final String 
typeName, final String property, final String value,
+                                                          final 
HiveHookIT.AssertPredicate assertPredicate) throws Exception {
+        waitFor(80000, new HiveHookIT.Predicate() {
+            @Override
+            public void evaluate() throws Exception {
+                AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = 
atlasClientV2.getEntityByAttribute(typeName, 
Collections.singletonMap(property,value));
+                AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
+                assertNotNull(entity);
+                if (assertPredicate != null) {
+                    assertPredicate.assertOnEntity(entity);
+                }
+            }
+        });
+        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = 
atlasClientV2.getEntityByAttribute(typeName, 
Collections.singletonMap(property,value));
+        AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
+        return entity;
     }
 
     public interface AssertPredicate {
-        void assertOnEntity(Referenceable entity) throws Exception;
+        void assertOnEntity(AtlasEntity entity) throws Exception;
     }
 
     public interface Predicate {
@@ -209,28 +275,30 @@ public class HiveITBase {
 
     protected String getTableProcessQualifiedName(String dbName, String 
tableName) throws Exception {
         return HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME,
-                hiveMetaStoreBridge.hiveClient.getTable(dbName, tableName));
+                hiveMetaStoreBridge.getHiveClient().getTable(dbName, 
tableName));
     }
 
-    protected void validateHDFSPaths(Referenceable processReference, String 
attributeName, String... testPaths) throws Exception {
-        List<Id> hdfsPathRefs = (List<Id>) processReference.get(attributeName);
+    protected void validateHDFSPaths(AtlasEntity processEntity, String 
attributeName, String... testPaths) throws Exception {
+        List<AtlasObjectId> hdfsPathIds = 
toAtlasObjectIdList(processEntity.getAttribute(attributeName));
 
         for (String testPath : testPaths) {
-            final Path path = new Path(testPath);
-            final String testPathNormed = lower(path.toString());
-            String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed);
-            Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId);
+            Path   path           = new Path(testPath);
+            String testPathNormed = lower(path.toString());
+            String hdfsPathId     = assertHDFSPathIsRegistered(testPathNormed);
 
-            Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId);
-            Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed);
-            Assert.assertEquals(hdfsPathRef.get(NAME), 
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
-            
Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), 
testPathNormed);
+            Assert.assertEquals(hdfsPathIds.get(0).getGuid(), hdfsPathId);
         }
     }
 
-    private String assertHDFSPathIsRegistered(String path) throws Exception {
+    protected String assertHDFSPathIsRegistered(String path) throws Exception {
         LOG.debug("Searching for hdfs path {}", path);
-        return assertEntityIsRegistered(HiveMetaStoreBridge.HDFS_PATH, 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, path, null);
+        // ATLAS-2444 HDFS name node federation adds the cluster name to the 
qualifiedName
+        if (path.startsWith("hdfs://")) {
+            String pathWithCluster = path + "@" + CLUSTER_NAME;
+            return assertEntityIsRegistered(HiveMetaStoreBridge.HDFS_PATH, 
REFERENCEABLE_ATTRIBUTE_NAME, pathWithCluster, null);
+        } else {
+            return assertEntityIsRegistered(HiveMetaStoreBridge.HDFS_PATH, 
REFERENCEABLE_ATTRIBUTE_NAME, path, null);
+        }
     }
 
     protected String assertDatabaseIsRegistered(String dbName) throws 
Exception {
@@ -240,7 +308,419 @@ public class HiveITBase {
     protected String assertDatabaseIsRegistered(String dbName, AssertPredicate 
assertPredicate) throws Exception {
         LOG.debug("Searching for database {}", dbName);
         String dbQualifiedName = 
HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName);
-        return assertEntityIsRegistered(HiveDataTypes.HIVE_DB.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+        return assertEntityIsRegistered(HiveDataTypes.HIVE_DB.getName(), 
REFERENCEABLE_ATTRIBUTE_NAME,
                 dbQualifiedName, assertPredicate);
     }
-}
+
+
+    protected AtlasEntity getAtlasEntityByType(String type, String id) throws 
Exception {
+        AtlasEntity atlasEntity = null;
+        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfoForProcess = 
atlasClientV2.getEntityByAttribute(type,
+                Collections.singletonMap(AtlasClient.GUID, id));
+        atlasEntity = atlasEntityWithExtInfoForProcess.getEntity();
+        return atlasEntity;
+    }
+
+
+    public static class HiveEventContext {
+        private Set<ReadEntity> inputs;
+        private Set<WriteEntity> outputs;
+
+        private String user;
+        private UserGroupInformation ugi;
+        private HiveOperation operation;
+        private HookContext.HookType hookType;
+        private JSONObject jsonPlan;
+        private String queryId;
+        private String queryStr;
+        private Long queryStartTime;
+
+        public Map<String, List<ColumnLineageUtils.HiveColumnLineageInfo>> 
lineageInfo;
+
+        private List<HookNotificationMessage> messages = new ArrayList<>();
+
+        public void setInputs(Set<ReadEntity> inputs) {
+            this.inputs = inputs;
+        }
+
+        public void setOutputs(Set<WriteEntity> outputs) {
+            this.outputs = outputs;
+        }
+
+        public void setUser(String user) {
+            this.user = user;
+        }
+
+        public void setUgi(UserGroupInformation ugi) {
+            this.ugi = ugi;
+        }
+
+        public void setOperation(HiveOperation operation) {
+            this.operation = operation;
+        }
+
+        public void setHookType(HookContext.HookType hookType) {
+            this.hookType = hookType;
+        }
+
+        public void setQueryId(String queryId) {
+            this.queryId = queryId;
+        }
+
+        public void setQueryStr(String queryStr) {
+            this.queryStr = queryStr;
+        }
+
+        public void setQueryStartTime(Long queryStartTime) {
+            this.queryStartTime = queryStartTime;
+        }
+
+        public void setLineageInfo(LineageInfo lineageInfo){
+            try {
+                this.lineageInfo = 
ColumnLineageUtils.buildLineageMap(lineageInfo);
+                LOG.debug("Column Lineage Map => {} ", 
this.lineageInfo.entrySet());
+            }catch (Throwable e){
+                LOG.warn("Column Lineage Map build failed with exception {}", 
e);
+            }
+        }
+
+        public Set<ReadEntity> getInputs() {
+            return inputs;
+        }
+
+        public Set<WriteEntity> getOutputs() {
+            return outputs;
+        }
+
+        public String getUser() {
+            return user;
+        }
+
+        public UserGroupInformation getUgi() {
+            return ugi;
+        }
+
+        public HiveOperation getOperation() {
+            return operation;
+        }
+
+        public HookContext.HookType getHookType() {
+            return hookType;
+        }
+
+        public String getQueryId() {
+            return queryId;
+        }
+
+        public String getQueryStr() {
+            return queryStr;
+        }
+
+        public Long getQueryStartTime() {
+            return queryStartTime;
+        }
+
+        public void addMessage(HookNotificationMessage message) {
+            messages.add(message);
+        }
+
+        public List<HookNotificationMessage> getMessages() {
+            return messages;
+        }
+    }
+
+
+    @VisibleForTesting
+    protected static String getProcessQualifiedName(HiveMetaStoreBridge 
dgiBridge, HiveEventContext eventContext,
+                                                    final 
SortedSet<ReadEntity> sortedHiveInputs,
+                                                    final 
SortedSet<WriteEntity> sortedHiveOutputs,
+                                                    SortedMap<ReadEntity, 
AtlasEntity> hiveInputsMap,
+                                                    SortedMap<WriteEntity, 
AtlasEntity> hiveOutputsMap) throws HiveException {
+        HiveOperation op = eventContext.getOperation();
+        if (isCreateOp(eventContext)) {
+            Entity entity = getEntityByType(sortedHiveOutputs, 
Entity.Type.TABLE);
+
+            if (entity != null) {
+                Table outTable = entity.getTable();
+                //refresh table
+                outTable = 
dgiBridge.getHiveClient().getTable(outTable.getDbName(), 
outTable.getTableName());
+                return 
HiveMetaStoreBridge.getTableProcessQualifiedName(dgiBridge.getClusterName(), 
outTable);
+            }
+        }
+
+        StringBuilder buffer = new StringBuilder(op.getOperationName());
+
+        boolean ignoreHDFSPathsinQFName = ignoreHDFSPathsinQFName(op, 
sortedHiveInputs, sortedHiveOutputs);
+        if ( ignoreHDFSPathsinQFName && LOG.isDebugEnabled()) {
+            LOG.debug("Ignoring HDFS paths in qualifiedName for {} {} ", op, 
eventContext.getQueryStr());
+        }
+
+        addInputs(dgiBridge, op, sortedHiveInputs, buffer, hiveInputsMap, 
ignoreHDFSPathsinQFName);
+        buffer.append(IO_SEP);
+        addOutputs(dgiBridge, op, sortedHiveOutputs, buffer, hiveOutputsMap, 
ignoreHDFSPathsinQFName);
+        LOG.info("Setting process qualified name to {}", buffer);
+        return buffer.toString();
+    }
+
+
+    protected static Entity getEntityByType(Set<? extends Entity> entities, 
Entity.Type entityType) {
+        for (Entity entity : entities) {
+            if (entity.getType() == entityType) {
+                return entity;
+            }
+        }
+        return null;
+    }
+
+
+    protected static boolean ignoreHDFSPathsinQFName(final HiveOperation op, 
final Set<ReadEntity> inputs, final Set<WriteEntity> outputs) {
+        switch (op) {
+            case LOAD:
+            case IMPORT:
+                return isPartitionBasedQuery(outputs);
+            case EXPORT:
+                return isPartitionBasedQuery(inputs);
+            case QUERY:
+                return true;
+        }
+        return false;
+    }
+
+    protected static boolean isPartitionBasedQuery(Set<? extends Entity> 
entities) {
+        for (Entity entity : entities) {
+            if (Entity.Type.PARTITION.equals(entity.getType())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    protected static boolean isCreateOp(HiveEventContext hiveEvent) {
+        return HiveOperation.CREATETABLE.equals(hiveEvent.getOperation())
+                || HiveOperation.CREATEVIEW.equals(hiveEvent.getOperation())
+                || HiveOperation.ALTERVIEW_AS.equals(hiveEvent.getOperation())
+                || 
HiveOperation.ALTERTABLE_LOCATION.equals(hiveEvent.getOperation())
+                || 
HiveOperation.CREATETABLE_AS_SELECT.equals(hiveEvent.getOperation());
+    }
+
+    protected static void addInputs(HiveMetaStoreBridge hiveBridge, 
HiveOperation op, SortedSet<ReadEntity> sortedInputs, StringBuilder buffer, 
final Map<ReadEntity, AtlasEntity> refs, final boolean ignoreHDFSPathsInQFName) 
throws HiveException {
+        if (refs != null) {
+            if (sortedInputs != null) {
+                Set<String> dataSetsProcessed = new LinkedHashSet<>();
+                for (Entity input : sortedInputs) {
+
+                    if 
(!dataSetsProcessed.contains(input.getName().toLowerCase())) {
+                        //HiveOperation.QUERY type encompasses INSERT, 
INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
+                        if (ignoreHDFSPathsInQFName &&
+                                (Entity.Type.DFS_DIR.equals(input.getType()) 
|| Entity.Type.LOCAL_DIR.equals(input.getType()))) {
+                            LOG.debug("Skipping dfs dir input addition to 
process qualified name {} ", input.getName());
+                        } else if (refs.containsKey(input)) {
+                            if ( input.getType() == Entity.Type.PARTITION || 
input.getType() == Entity.Type.TABLE) {
+                                Table inputTable = refreshTable(hiveBridge, 
input.getTable().getDbName(), input.getTable().getTableName());
+
+                                if (inputTable != null) {
+                                    addDataset(buffer, refs.get(input), 
HiveMetaStoreBridge.getTableCreatedTime(inputTable));
+                                }
+                            } else {
+                                addDataset(buffer, refs.get(input));
+                            }
+                        }
+
+                        dataSetsProcessed.add(input.getName().toLowerCase());
+                    }
+                }
+
+            }
+        }
+    }
+
+    protected static void addDataset(StringBuilder buffer, AtlasEntity ref, 
final long createTime) {
+        addDataset(buffer, ref);
+        buffer.append(SEP);
+        buffer.append(createTime);
+    }
+
+    protected static void addDataset(StringBuilder buffer, AtlasEntity ref) {
+        buffer.append(SEP);
+        String dataSetQlfdName = (String) 
ref.getAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
+        // '/' breaks query parsing on ATLAS
+        buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", ""));
+    }
+
+    protected static void addOutputs(HiveMetaStoreBridge hiveBridge, 
HiveOperation op, SortedSet<WriteEntity> sortedOutputs, StringBuilder buffer, 
final Map<WriteEntity, AtlasEntity> refs, final boolean 
ignoreHDFSPathsInQFName) throws HiveException {
+        if (refs != null) {
+            Set<String> dataSetsProcessed = new LinkedHashSet<>();
+            if (sortedOutputs != null) {
+                for (WriteEntity output : sortedOutputs) {
+                    final Entity entity = output;
+                    if 
(!dataSetsProcessed.contains(output.getName().toLowerCase())) {
+                        if (ignoreHDFSPathsInQFName &&
+                                (Entity.Type.DFS_DIR.equals(output.getType()) 
|| Entity.Type.LOCAL_DIR.equals(output.getType()))) {
+                            LOG.debug("Skipping dfs dir output addition to 
process qualified name {} ", output.getName());
+                        } else if (refs.containsKey(output)) {
+                            //HiveOperation.QUERY type encompasses INSERT, 
INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
+                            if (addQueryType(op, (WriteEntity) entity)) {
+                                buffer.append(SEP);
+                                buffer.append(((WriteEntity) 
entity).getWriteType().name());
+                            }
+
+                            if (output.getType() == Entity.Type.PARTITION || 
output.getType() == Entity.Type.TABLE) {
+                                Table outputTable = refreshTable(hiveBridge, 
output.getTable().getDbName(), output.getTable().getTableName());
+
+                                if (outputTable != null) {
+                                    addDataset(buffer, refs.get(output), 
HiveMetaStoreBridge.getTableCreatedTime(outputTable));
+                                }
+                            } else {
+                                addDataset(buffer, refs.get(output));
+                            }
+                        }
+
+                        dataSetsProcessed.add(output.getName().toLowerCase());
+                    }
+                }
+            }
+        }
+    }
+
+    protected static Table refreshTable(HiveMetaStoreBridge dgiBridge, String 
dbName, String tableName) {
+        try {
+            return dgiBridge.getHiveClient().getTable(dbName, tableName);
+        } catch (HiveException excp) { // this might be the case for temp 
tables
+            LOG.warn("failed to get details for table {}.{}. Ignoring. {}: 
{}", dbName, tableName, excp.getClass().getCanonicalName(), excp.getMessage());
+        }
+
+        return null;
+    }
+
+    protected static boolean addQueryType(HiveOperation op, WriteEntity 
entity) {
+        if (entity.getWriteType() != null && HiveOperation.QUERY.equals(op)) {
+            switch (entity.getWriteType()) {
+                case INSERT:
+                case INSERT_OVERWRITE:
+                case UPDATE:
+                case DELETE:
+                    return true;
+                case PATH_WRITE:
+                    //Add query type only for DFS paths and ignore local paths 
since they are not added as outputs
+                    if ( !Entity.Type.LOCAL_DIR.equals(entity.getType())) {
+                        return true;
+                    }
+                    break;
+                default:
+            }
+        }
+        return false;
+    }
+
+
+    @VisibleForTesting
+    protected static final class EntityComparator implements 
Comparator<Entity> {
+        @Override
+        public int compare(Entity o1, Entity o2) {
+            String s1 = o1.getName();
+            String s2 = o2.getName();
+            if (s1 == null || s2 == null){
+                s1 = o1.getD().toString();
+                s2 = o2.getD().toString();
+            }
+            return s1.toLowerCase().compareTo(s2.toLowerCase());
+        }
+    }
+
+    @VisibleForTesting
+    protected static final Comparator<Entity> entityComparator = new 
EntityComparator();
+
+    protected AtlasObjectId toAtlasObjectId(Object obj) {
+        final AtlasObjectId ret;
+
+        if (obj instanceof AtlasObjectId) {
+            ret = (AtlasObjectId) obj;
+        } else if (obj instanceof Map) {
+            ret = new AtlasObjectId((Map) obj);
+        } else if (obj != null) {
+            ret = new AtlasObjectId(obj.toString()); // guid
+        } else {
+            ret = null;
+        }
+
+        return ret;
+    }
+
+    protected List<AtlasObjectId> toAtlasObjectIdList(Object obj) {
+        final List<AtlasObjectId> ret;
+
+        if (obj instanceof Collection) {
+            Collection coll = (Collection) obj;
+
+            ret = new ArrayList<>(coll.size());
+
+            for (Object item : coll) {
+                AtlasObjectId objId = toAtlasObjectId(item);
+
+                if (objId != null) {
+                    ret.add(objId);
+                }
+            }
+        } else {
+            AtlasObjectId objId = toAtlasObjectId(obj);
+
+            if (objId != null) {
+                ret = new ArrayList<>(1);
+
+                ret.add(objId);
+            } else {
+                ret = null;
+            }
+        }
+
+        return ret;
+    }
+
+
+    protected AtlasStruct toAtlasStruct(Object obj) {
+        final AtlasStruct ret;
+
+        if (obj instanceof AtlasStruct) {
+            ret = (AtlasStruct) obj;
+        } else if (obj instanceof Map) {
+            Map    map        = (Map) obj;
+            Object typeName   = map.get("typeName");
+            Map    attributes = (map.get("attributes") instanceof Map) ? (Map) 
map.get("attributes") : map;
+
+            ret = new AtlasStruct(typeName == null ? "" : typeName.toString(), 
attributes);
+        } else {
+            ret = null;
+        }
+
+        return ret;
+    }
+
+    protected List<AtlasStruct> toAtlasStructList(Object obj) {
+        final List<AtlasStruct> ret;
+
+        if (obj instanceof Collection) {
+            Collection coll = (Collection) obj;
+
+            ret = new ArrayList<>(coll.size());
+
+            for (Object item : coll) {
+                AtlasStruct struct = toAtlasStruct(item);
+
+                if (struct != null) {
+                    ret.add(struct);
+                }
+            }
+        } else {
+            AtlasStruct struct = toAtlasStruct(obj);
+
+            if (struct != null) {
+                ret = new ArrayList<>(1);
+
+                ret.add(struct);
+            } else {
+                ret = null;
+            }
+        }
+
+        return ret;
+    }}

http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java
 
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java
new file mode 100644
index 0000000..dc14480
--- /dev/null
+++ 
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hive.bridge;
+
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.hive.hook.events.BaseHiveEvent;
+import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ColumnLineageUtils {
+    public static final Logger LOG = 
LoggerFactory.getLogger(ColumnLineageUtils.class);
+    public static class HiveColumnLineageInfo {
+        public final String depenendencyType;
+        public final String expr;
+        public final String inputColumn;
+
+        HiveColumnLineageInfo(LineageInfo.Dependency d, String inputCol) {
+            depenendencyType = d.getType().name();
+            expr = d.getExpr();
+            inputColumn = inputCol;
+        }
+
+        @Override
+        public String toString(){
+            return inputColumn;
+        }
+    }
+
+    public static String getQualifiedName(LineageInfo.DependencyKey key){
+        String db = key.getDataContainer().getTable().getDbName();
+        String table = key.getDataContainer().getTable().getTableName();
+        String col = key.getFieldSchema().getName();
+        return db + "." + table + "." + col;
+    }
+
+    public static Map<String, List<HiveColumnLineageInfo>> 
buildLineageMap(LineageInfo lInfo) {
+        Map<String, List<HiveColumnLineageInfo>> m = new HashMap<>();
+
+        for (Map.Entry<LineageInfo.DependencyKey, LineageInfo.Dependency> e : 
lInfo.entrySet()) {
+            List<HiveColumnLineageInfo> l = new ArrayList<>();
+            String k = getQualifiedName(e.getKey());
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("buildLineageMap(): key={}; value={}", e.getKey(), 
e.getValue());
+            }
+
+            Collection<LineageInfo.BaseColumnInfo> baseCols = 
getBaseCols(e.getValue());
+
+            if (baseCols != null) {
+                for (LineageInfo.BaseColumnInfo iCol : baseCols) {
+                    String db = iCol.getTabAlias().getTable().getDbName();
+                    String table = 
iCol.getTabAlias().getTable().getTableName();
+                    String colQualifiedName = iCol.getColumn() == null ? db + 
"." + table : db + "." + table + "." + iCol.getColumn().getName();
+                    l.add(new HiveColumnLineageInfo(e.getValue(), 
colQualifiedName));
+                }
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Setting lineage --> Input: {} ==> Output : {}", 
l, k);
+                }
+                m.put(k, l);
+            }
+        }
+        return m;
+    }
+
+    static Collection<LineageInfo.BaseColumnInfo> 
getBaseCols(LineageInfo.Dependency lInfoDep) {
+        Collection<LineageInfo.BaseColumnInfo> ret = null;
+
+        if (lInfoDep != null) {
+            try {
+                Method getBaseColsMethod = 
lInfoDep.getClass().getMethod("getBaseCols");
+
+                Object retGetBaseCols = getBaseColsMethod.invoke(lInfoDep);
+
+                if (retGetBaseCols != null) {
+                    if (retGetBaseCols instanceof Collection) {
+                        ret = (Collection) retGetBaseCols;
+                    } else {
+                        LOG.warn("{}: unexpected return type from 
LineageInfo.Dependency.getBaseCols(), expected type {}",
+                                retGetBaseCols.getClass().getName(), 
"Collection");
+                    }
+                }
+            } catch (NoSuchMethodException | InvocationTargetException | 
IllegalAccessException ex) {
+                LOG.warn("getBaseCols()", ex);
+            }
+        }
+
+        return ret;
+    }
+
+    static String[] extractComponents(String qualifiedName) {
+        String[] comps = qualifiedName.split("\\.");
+        int lastIdx = comps.length - 1;
+        int atLoc = comps[lastIdx].indexOf('@');
+        if (atLoc > 0) {
+            comps[lastIdx] = comps[lastIdx].substring(0, atLoc);
+        }
+        return comps;
+    }
+
+    static void populateColumnReferenceableMap(Map<String, Referenceable> m,
+                                               Referenceable r) {
+        if (r.getTypeName().equals(HiveDataTypes.HIVE_TABLE.getName())) {
+            String qName = (String) 
r.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
+            String[] qNameComps = extractComponents(qName);
+            for (Referenceable col : (List<Referenceable>) 
r.get(BaseHiveEvent.ATTRIBUTE_COLUMNS)) {
+                String cName = (String) 
col.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
+                String[] colQNameComps = extractComponents(cName);
+                String colQName = colQNameComps[0] + "." + colQNameComps[1] + 
"." + colQNameComps[2];
+                m.put(colQName, col);
+            }
+            String tableQName = qNameComps[0] + "." + qNameComps[1];
+            m.put(tableQName, r);
+        }
+    }
+
+
+    public static Map<String, Referenceable> 
buildColumnReferenceableMap(List<Referenceable> inputs,
+                                                                         
List<Referenceable> outputs) {
+        Map<String, Referenceable> m = new HashMap<>();
+
+        for (Referenceable r : inputs) {
+            populateColumnReferenceableMap(m, r);
+        }
+
+        for (Referenceable r : outputs) {
+            populateColumnReferenceableMap(m, r);
+        }
+
+        return m;
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java
 
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java
deleted file mode 100644
index f4abfb6..0000000
--- 
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.hive.bridge;
-
-import org.apache.atlas.hive.hook.HiveHook;
-import org.apache.atlas.hive.rewrite.HiveASTRewriter;
-import org.apache.atlas.hive.rewrite.RewriteException;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-@Test(enabled = false)
-public class HiveLiteralRewriterTest {
-
-    private HiveConf conf;
-
-    @BeforeClass(enabled = false)
-    public void setup() {
-        conf = new HiveConf();
-        conf.addResource("/hive-site.xml");
-        SessionState ss = new SessionState(conf, "testuser");
-        SessionState.start(ss);
-        conf.set("hive.lock.manager", 
"org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager");
-    }
-
-    @Test(enabled=false)
-    public void testLiteralRewrite() throws RewriteException {
-        HiveHook.HiveEventContext ctx = new HiveHook.HiveEventContext();
-        ctx.setQueryStr("insert into table testTable 
partition(dt='2014-01-01') select * from test1 where dt = '2014-01-01'" +
-            " and intColumn = 10" +
-            " and decimalColumn = 1.10" +
-            " and charColumn = 'a'" +
-            " and hexColumn = unhex('\\0xFF')" +
-            " and expColumn = cast('-1.5e2' as int)" +
-            " and boolCol = true");
-
-            HiveASTRewriter queryRewriter  = new HiveASTRewriter(conf);
-            String result = queryRewriter.rewrite(ctx.getQueryStr());
-            System.out.println("normlized sql : " + result);
-
-            final String normalizedSQL = "insert into table testTable 
partition(dt='STRING_LITERAL') " +
-                "select * from test1 where dt = 'STRING_LITERAL' " +
-                "and intColumn = NUMBER_LITERAL " +
-                "and decimalColumn = NUMBER_LITERAL and " +
-                "charColumn = 'STRING_LITERAL' and " +
-                "hexColumn = unhex('STRING_LITERAL') and " +
-                "expColumn = cast('STRING_LITERAL' as int) and " +
-                "boolCol = BOOLEAN_LITERAL";
-            Assert.assertEquals(result, normalizedSQL);
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
 
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
index 0256cf3..d42182e 100644
--- 
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
+++ 
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
@@ -19,9 +19,11 @@
 package org.apache.atlas.hive.bridge;
 
 import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasClientV2;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.hive.model.HiveDataTypes;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -31,20 +33,21 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.mapred.TextInputFormat;
-import org.codehaus.jettison.json.JSONException;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import scala.actors.threadpool.Arrays;
 
+import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.eq;
+import static org.apache.atlas.hive.hook.events.BaseHiveEvent.*;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -61,6 +64,18 @@ public class HiveMetaStoreBridgeTest {
     @Mock
     private AtlasClient atlasClient;
 
+    @Mock
+    private AtlasClientV2 atlasClientV2;
+
+    @Mock
+    private AtlasEntity atlasEntity;
+
+    @Mock
+    private AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo;
+
+    @Mock
+    EntityMutationResponse entityMutationResponse;
+
     @BeforeMethod
     public void initializeMocks() {
         MockitoAnnotations.initMocks(this);
@@ -71,19 +86,21 @@ public class HiveMetaStoreBridgeTest {
         // setup database
         when(hiveClient.getAllDatabases()).thenReturn(Arrays.asList(new 
String[]{TEST_DB_NAME}));
         String description = "This is a default database";
-        when(hiveClient.getDatabase(TEST_DB_NAME)).thenReturn(
-                new Database(TEST_DB_NAME, description, "/user/hive/default", 
null));
+        Database db = new Database(TEST_DB_NAME, description, 
"/user/hive/default", null);
+        when(hiveClient.getDatabase(TEST_DB_NAME)).thenReturn(db);
         
when(hiveClient.getAllTables(TEST_DB_NAME)).thenReturn(Arrays.asList(new 
String[]{}));
 
-        returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
+        returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME);
 
-        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, 
hiveClient, atlasClient);
+        
when(atlasEntityWithExtInfo.getEntity("72e06b34-9151-4023-aa9d-b82103a50e76"))
+                .thenReturn((new AtlasEntity.AtlasEntityWithExtInfo(
+                        getEntity(HiveDataTypes.HIVE_DB.getName(), 
AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76"))).getEntity());
+
+        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, 
hiveClient, atlasClientV2);
         bridge.importHiveMetadata(true);
 
         // verify update is called
-        
verify(atlasClient).updateEntity(eq("72e06b34-9151-4023-aa9d-b82103a50e76"),
-                (Referenceable) argThat(
-                        new 
MatchesReferenceableProperty(HiveMetaStoreBridge.DESCRIPTION_ATTR, 
description)));
+        
verify(atlasClientV2).updateEntity((AtlasEntity.AtlasEntityWithExtInfo)anyObject());
     }
 
     @Test
@@ -92,32 +109,50 @@ public class HiveMetaStoreBridgeTest {
 
         List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, 
TEST_TABLE_NAME);
 
-        returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
+        returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME);
 
         // return existing table
-        when(atlasClient.getEntity(HiveDataTypes.HIVE_TABLE.getName(),
-            AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, 
TEST_TABLE_NAME)))
-            .thenReturn(getEntityReference(HiveDataTypes.HIVE_TABLE.getName(), 
"82e06b34-9151-4023-aa9d-b82103a50e77"));
-        
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
+
+        
when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77"))
+                .thenReturn((new AtlasEntity.AtlasEntityWithExtInfo(
+                        getEntity(HiveDataTypes.HIVE_TABLE.getName(), 
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))).getEntity());
+
+        
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(),
+                
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                        
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, 
TEST_TABLE_NAME))))
+                .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
+                        getEntity(HiveDataTypes.HIVE_TABLE.getName(), 
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
+
+        
when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77"))
+                .thenReturn(createTableReference());
+
         String processQualifiedName = 
HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, 
hiveTables.get(0));
-        when(atlasClient.getEntity(HiveDataTypes.HIVE_PROCESS.getName(),
-            AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
processQualifiedName)).thenReturn(getEntityReference(HiveDataTypes.HIVE_PROCESS.getName(),
 "82e06b34-9151-4023-aa9d-b82103a50e77"));
 
-        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, 
hiveClient, atlasClient);
+        
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
+                
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                        processQualifiedName)))
+                .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
+                        getEntity(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
+
+
+        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, 
hiveClient, atlasClientV2);
         bridge.importHiveMetadata(true);
 
         // verify update is called on table
-        
verify(atlasClient).updateEntity(eq("82e06b34-9151-4023-aa9d-b82103a50e77"),
-                (Referenceable) argThat(new 
MatchesReferenceableProperty(HiveMetaStoreBridge.TABLE_TYPE_ATTR,
-                        TableType.EXTERNAL_TABLE.name())));
+        verify(atlasClientV2, 
times(2)).updateEntity((AtlasEntity.AtlasEntityWithExtInfo)anyObject());
+
     }
 
-    private void returnExistingDatabase(String databaseName, AtlasClient 
atlasClient, String clusterName)
-            throws AtlasServiceException, JSONException {
-        when(atlasClient.getEntity(
-            HiveDataTypes.HIVE_DB.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-            HiveMetaStoreBridge.getDBQualifiedName(clusterName, 
databaseName))).thenReturn(
-            getEntityReference(HiveDataTypes.HIVE_DB.getName(), 
"72e06b34-9151-4023-aa9d-b82103a50e76"));
+    private void returnExistingDatabase(String databaseName, AtlasClientV2 
atlasClientV2, String clusterName)
+            throws AtlasServiceException {
+        //getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID, 
"72e06b34-9151-4023-aa9d-b82103a50e76");
+
+        
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_DB.getName(),
+                
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                        HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, 
TEST_DB_NAME))))
+                .thenReturn((new AtlasEntity.AtlasEntityWithExtInfo(
+                        getEntity(HiveDataTypes.HIVE_DB.getName(), 
AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76"))));
+
     }
 
     private List<Table> setupTables(Hive hiveClient, String databaseName, 
String... tableNames) throws HiveException {
@@ -143,15 +178,25 @@ public class HiveMetaStoreBridgeTest {
         List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, 
TEST_TABLE_NAME);
         Table hiveTable = hiveTables.get(0);
 
-        returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
+        returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME);
+
+
+        
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(),
+                
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                        
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, 
TEST_TABLE_NAME))))
+                .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
+                        getEntity(HiveDataTypes.HIVE_TABLE.getName(), 
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
 
-        when(atlasClient.getEntity(HiveDataTypes.HIVE_TABLE.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-            HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, 
TEST_DB_NAME, TEST_TABLE_NAME))).thenReturn(
-            getEntityReference(HiveDataTypes.HIVE_TABLE.getName(), 
"82e06b34-9151-4023-aa9d-b82103a50e77"));
         String processQualifiedName = 
HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTable);
-        when(atlasClient.getEntity(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                
processQualifiedName)).thenReturn(getEntityReference(HiveDataTypes.HIVE_PROCESS.getName(),
 "82e06b34-9151-4023-aa9d-b82103a50e77"));
-        
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
+
+        
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
+                
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                        processQualifiedName)))
+                .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
+                        getEntity(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
+
+        
when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77"))
+                .thenReturn(createTableReference());
 
         Partition partition = mock(Partition.class);
         when(partition.getTable()).thenReturn(hiveTable);
@@ -160,7 +205,7 @@ public class HiveMetaStoreBridgeTest {
 
         when(hiveClient.getPartitions(hiveTable)).thenReturn(Arrays.asList(new 
Partition[]{partition}));
 
-        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, 
hiveClient, atlasClient);
+        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, 
hiveClient, atlasClientV2);
         try {
             bridge.importHiveMetadata(true);
         } catch (Exception e) {
@@ -174,18 +219,27 @@ public class HiveMetaStoreBridgeTest {
         final String table2Name = TEST_TABLE_NAME + "_1";
         List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, 
TEST_TABLE_NAME, table2Name);
 
-        returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
+        returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME);
         when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new 
RuntimeException("Timeout while reading data from hive metastore"));
 
-        when(atlasClient.getEntity(HiveDataTypes.HIVE_TABLE.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME,
-            table2Name))).thenReturn(
-            getEntityReference(HiveDataTypes.HIVE_TABLE.getName(), 
"82e06b34-9151-4023-aa9d-b82103a50e77"));
-        
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
+        
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(),
+                
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                        
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, 
TEST_TABLE_NAME))))
+                .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
+                        getEntity(HiveDataTypes.HIVE_TABLE.getName(), 
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
+
+        
when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77"))
+                .thenReturn(createTableReference());
+
         String processQualifiedName = 
HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, 
hiveTables.get(1));
-        when(atlasClient.getEntity(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-            
processQualifiedName)).thenReturn(getEntityReference(HiveDataTypes.HIVE_PROCESS.getName(),
 "82e06b34-9151-4023-aa9d-b82103a50e77"));
 
-        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, 
hiveClient, atlasClient);
+        
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
+                
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                        processQualifiedName)))
+                .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
+                        getEntity(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
+
+        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, 
hiveClient, atlasClientV2);
         try {
             bridge.importHiveMetadata(false);
         } catch (Exception e) {
@@ -199,18 +253,29 @@ public class HiveMetaStoreBridgeTest {
         final String table2Name = TEST_TABLE_NAME + "_1";
         List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, 
TEST_TABLE_NAME, table2Name);
 
-        returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
+        returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME);
         when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new 
RuntimeException("Timeout while reading data from hive metastore"));
 
-        when(atlasClient.getEntity(HiveDataTypes.HIVE_TABLE.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME,
-            table2Name))).thenReturn(
-            getEntityReference(HiveDataTypes.HIVE_TABLE.getName(), 
"82e06b34-9151-4023-aa9d-b82103a50e77"));
-        
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
-        String processQualifiedName = 
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(1));
-        when(atlasClient.getEntity(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-            
processQualifiedName)).thenReturn(getEntityReference(HiveDataTypes.HIVE_PROCESS.getName(),
 "82e06b34-9151-4023-aa9d-b82103a50e77"));
 
-        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, 
hiveClient, atlasClient);
+        
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(),
+                
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                        
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, 
TEST_TABLE_NAME))))
+                .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
+                        getEntity(HiveDataTypes.HIVE_TABLE.getName(), 
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
+
+
+        
when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77"))
+                .thenReturn(createTableReference());
+
+        String processQualifiedName = 
HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, 
hiveTables.get(1));
+
+        
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
+                
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                        processQualifiedName)))
+                .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
+                        getEntity(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
+
+        HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, 
hiveClient, atlasClientV2);
         try {
             bridge.importHiveMetadata(true);
             Assert.fail("Table registration is supposed to fail");
@@ -219,15 +284,15 @@ public class HiveMetaStoreBridgeTest {
         }
     }
 
-    private Referenceable getEntityReference(String typeName, String id) 
throws JSONException {
-        return new Referenceable(id, typeName, null);
+    private AtlasEntity getEntity(String typeName, String attr, String value) {
+        return new AtlasEntity(typeName, attr, value);
     }
 
-    private Referenceable createTableReference() {
-        Referenceable tableReference = new 
Referenceable(HiveDataTypes.HIVE_TABLE.getName());
-        Referenceable sdReference = new 
Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName());
-        tableReference.set(HiveMetaStoreBridge.STORAGE_DESC, sdReference);
-        return tableReference;
+    private AtlasEntity createTableReference() {
+        AtlasEntity tableEntity = new 
AtlasEntity(HiveDataTypes.HIVE_TABLE.getName());
+        AtlasEntity sdEntity = new 
AtlasEntity(HiveDataTypes.HIVE_STORAGEDESC.getName());
+        tableEntity.setAttribute(ATTRIBUTE_STORAGEDESC, getObjectId(sdEntity));
+        return tableEntity;
     }
 
     private Table createTestTable(String databaseName, String tableName) 
throws HiveException {
@@ -253,7 +318,7 @@ public class HiveMetaStoreBridgeTest {
 
         @Override
         public boolean matches(Object o) {
-            return attrValue.equals(((Referenceable) o).get(attrName));
+            return attrValue.equals(((AtlasEntity) o).getAttribute(attrName));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/5273ab69/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetastoreBridgeIT.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetastoreBridgeIT.java
 
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetastoreBridgeIT.java
index d09db1b..a5b1f4d 100644
--- 
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetastoreBridgeIT.java
+++ 
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetastoreBridgeIT.java
@@ -21,8 +21,8 @@ package org.apache.atlas.hive.bridge;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.hive.HiveITBase;
 import org.apache.atlas.hive.model.HiveDataTypes;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.testng.annotations.Test;
 
 import java.util.List;
@@ -34,34 +34,36 @@ public class HiveMetastoreBridgeIT extends HiveITBase {
     @Test
     public void testCreateTableAndImport() throws Exception {
         String tableName = tableName();
+        String pFile     = createTestDFSPath("parentPath");
+        String query     = String.format("create EXTERNAL table %s(id string, 
cnt int) location '%s'", tableName, pFile);
 
-        String pFile = createTestDFSPath("parentPath");
-        final String query = String.format("create EXTERNAL table %s(id 
string, cnt int) location '%s'", tableName, pFile);
         runCommand(query);
-        String dbId = assertDatabaseIsRegistered(DEFAULT_DB);
+
+        String dbId    = assertDatabaseIsRegistered(DEFAULT_DB);
         String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
 
         //verify lineage is created
-        String processId = 
assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
-                AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
-        Referenceable processReference = atlasClient.getEntity(processId);
-        validateHDFSPaths(processReference, INPUTS, pFile);
+        String      processId      = 
assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
+        AtlasEntity processsEntity = 
atlasClientV2.getEntityByGuid(processId).getEntity();
+
+        validateHDFSPaths(processsEntity, INPUTS, pFile);
+
+        List<AtlasObjectId> outputs = 
toAtlasObjectIdList(processsEntity.getAttribute(OUTPUTS));
 
-        List<Id> outputs = (List<Id>) processReference.get(OUTPUTS);
         assertEquals(outputs.size(), 1);
-        assertEquals(outputs.get(0).getId()._getId(), tableId);
+        assertEquals(outputs.get(0).getGuid(), tableId);
 
         int tableCount = 
atlasClient.listEntities(HiveDataTypes.HIVE_TABLE.getName()).size();
 
         //Now import using import tool - should be no-op. This also tests 
update since table exists
-        hiveMetaStoreBridge.importTable(atlasClient.getEntity(dbId), 
DEFAULT_DB, tableName, true);
+        AtlasEntity dbEntity = atlasClientV2.getEntityByGuid(dbId).getEntity();
+
+        hiveMetaStoreBridge.importTable(dbEntity, DEFAULT_DB, tableName, true);
+
         String tableId2 = assertTableIsRegistered(DEFAULT_DB, tableName);
         assertEquals(tableId2, tableId);
 
-        String processId2 = 
assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
-                AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
+        String processId2 = 
assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
         assertEquals(processId2, processId);
 
         //assert that table is de-duped and no new entity is created
@@ -72,18 +74,23 @@ public class HiveMetastoreBridgeIT extends HiveITBase {
     @Test
     public void testImportCreatedTable() throws Exception {
         String tableName = tableName();
-        String pFile = createTestDFSPath("parentPath");
+        String pFile     = createTestDFSPath("parentPath");
+
         runCommand(driverWithoutContext, String.format("create EXTERNAL table 
%s(id string) location '%s'", tableName, pFile));
+
         String dbId = assertDatabaseIsRegistered(DEFAULT_DB);
 
-        hiveMetaStoreBridge.importTable(atlasClient.getEntity(dbId), 
DEFAULT_DB, tableName, true);
+        AtlasEntity dbEntity = atlasClientV2.getEntityByGuid(dbId).getEntity();
+
+        hiveMetaStoreBridge.importTable(dbEntity, DEFAULT_DB, tableName, true);
+
         String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
 
-        String processId = 
assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
-                AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
-        List<Id> outputs = (List<Id>) 
atlasClient.getEntity(processId).get(OUTPUTS);
+        String              processId     = 
assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), 
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
+        AtlasEntity         processEntity = 
atlasClientV2.getEntityByGuid(processId).getEntity();
+        List<AtlasObjectId> outputs       = 
toAtlasObjectIdList(processEntity.getAttribute(OUTPUTS));
+
         assertEquals(outputs.size(), 1);
-        assertEquals(outputs.get(0).getId()._getId(), tableId);
+        assertEquals(outputs.get(0).getGuid(), tableId);
     }
 }

Reply via email to