This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 60b44a9  ATLAS-3185: Create process execution for Impala integration
60b44a9 is described below

commit 60b44a93c4033991d160b8d4ac1af7723b10d564
Author: lina.li <[email protected]>
AuthorDate: Wed May 29 12:46:09 2019 -0700

    ATLAS-3185: Create process execution for Impala integration
    
    Signed-off-by: Sarath Subramanian <[email protected]>
---
 .../org/apache/atlas/impala/ImpalaLineageTool.java |   1 -
 .../atlas/impala/hook/AtlasImpalaHookContext.java  |   6 +-
 .../atlas/impala/hook/ImpalaIdentifierParser.java  |   3 -
 .../atlas/impala/hook/ImpalaLineageHook.java       |  18 ++-
 .../atlas/impala/hook/events/BaseImpalaEvent.java  |  44 +++++++-
 .../impala/hook/events/CreateImpalaProcess.java    |  33 ++++--
 .../apache/atlas/impala/ImpalaLineageITBase.java   | 125 +++++++++++++++++++++
 .../apache/atlas/impala/ImpalaLineageToolIT.java   | 117 +++++++++++++++++--
 .../atlas/impala/hook/ImpalaLineageHookIT.java     |  10 +-
 .../impalaMultipleInsertIntoAsSelect1.json         |  83 ++++++++++++++
 .../impalaMultipleInsertIntoAsSelect2.json         |  83 ++++++++++++++
 11 files changed, 493 insertions(+), 30 deletions(-)

diff --git 
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java
 
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java
index 7c9abc8..6e6d6f1 100644
--- 
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java
+++ 
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java
@@ -18,7 +18,6 @@
 
 package org.apache.atlas.impala;
 
-import java.lang.Runnable;
 import org.apache.atlas.impala.hook.ImpalaLineageHook;
 
 import java.io.*;
diff --git 
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
 
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
index 88faace..5b5f05a 100644
--- 
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
+++ 
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import org.apache.atlas.impala.model.ImpalaOperationType;
 import org.apache.atlas.impala.model.ImpalaQuery;
 import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.commons.lang.StringUtils;
+
 
 /**
  * Contain the info related to an linear record from Impala
@@ -70,6 +70,10 @@ public class AtlasImpalaHookContext {
         return hook.getClusterName();
     }
 
+    public String getHostName() {
+        return hook.getHostName();
+    }
+
     public boolean isConvertHdfsPathToLowerCase() {
         return hook.isConvertHdfsPathToLowerCase();
     }
diff --git 
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java
 
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java
index b9d6cbb..33e44f7 100644
--- 
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java
+++ 
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java
@@ -19,11 +19,8 @@
 package org.apache.atlas.impala.hook;
 
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.HashSet;
 
-import java.util.LinkedHashMap;
-import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang.StringUtils;
 
diff --git 
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
 
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
index 232a569..6d65ae0 100644
--- 
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
+++ 
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
@@ -19,7 +19,8 @@
 package org.apache.atlas.impala.hook;
 
 import static org.apache.atlas.AtlasConstants.DEFAULT_CLUSTER_NAME;
-
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import org.apache.atlas.hook.AtlasHook;
@@ -44,15 +45,24 @@ public class ImpalaLineageHook extends AtlasHook {
     public static final String CONF_CLUSTER_NAME                   = 
"atlas.cluster.name";
     public static final String CONF_REALM_NAME                     = 
"atlas.realm.name";
     public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE     = 
CONF_PREFIX + "hdfs_path.convert_to_lowercase";
+    public static final String DEFAULT_HOST_NAME                   = 
"localhost";
 
     private static final String clusterName;
-    private  static final String realm;
+    private static final String realm;
     private static final boolean convertHdfsPathToLowerCase;
+    private static String hostName;
 
     static {
         clusterName                     = 
atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
         realm                           = 
atlasProperties.getString(CONF_REALM_NAME, DEFAULT_CLUSTER_NAME);  // what 
should default be ??
         convertHdfsPathToLowerCase      = 
atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
+
+        try {
+            hostName = InetAddress.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+            LOG.warn("No hostname found. Setting the hostname to default value 
{}", DEFAULT_HOST_NAME, e);
+            hostName = DEFAULT_HOST_NAME;
+        }
     }
 
     public ImpalaLineageHook() {
@@ -122,6 +132,10 @@ public class ImpalaLineageHook extends AtlasHook {
         }
     }
 
+    public String getHostName() {
+        return hostName;
+    }
+
     private UserGroupInformation getUgiFromUserName(String userName)  throws 
IOException {
         String userPrincipal = userName.contains(REALM_SEPARATOR)? userName : 
userName + "@" + getRealm();
         Subject userSubject = new Subject(false, Sets.newHashSet(
diff --git 
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
 
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
index c7604ba..d241b6a 100644
--- 
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
+++ 
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
 import org.apache.atlas.impala.hook.ImpalaOperationParser;
 import org.apache.atlas.impala.model.ImpalaDataType;
@@ -43,6 +44,8 @@ import 
org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.type.AtlasTypeUtil;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,12 +78,17 @@ public abstract class BaseImpalaEvent {
     public static final String ATTRIBUTE_START_TIME                = 
"startTime";
     public static final String ATTRIBUTE_USER_NAME                 = 
"userName";
     public static final String ATTRIBUTE_QUERY_TEXT                = 
"queryText";
+    public static final String ATTRIBUTE_PROCESS                   = "process";
+    public static final String ATTRIBUTE_PROCESS_EXECUTIONS        = 
"processExecutions";
     public static final String ATTRIBUTE_QUERY_ID                  = "queryId";
     public static final String ATTRIBUTE_QUERY_PLAN                = 
"queryPlan";
     public static final String ATTRIBUTE_END_TIME                  = "endTime";
     public static final String ATTRIBUTE_RECENT_QUERIES            = 
"recentQueries";
     public static final String ATTRIBUTE_QUERY                     = "query";
     public static final String ATTRIBUTE_DEPENDENCY_TYPE           = 
"dependencyType";
+    public static final String ATTRIBUTE_HOSTNAME                  = 
"hostName";
+    public static final String EMPTY_ATTRIBUTE_VALUE               = "";
+
     public static final long   MILLIS_CONVERT_FACTOR               = 1000;
 
 
@@ -525,15 +533,43 @@ public abstract class BaseImpalaEvent {
         ret.setAttribute(ATTRIBUTE_NAME, queryStr);
         ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, 
context.getImpalaOperationType());
 
-        // the unit of timestamp from lineage record is in seconds. Convert to 
milliseconds to Atlas
-        ret.setAttribute(ATTRIBUTE_START_TIME, 
context.getLineageQuery().getTimestamp() * 
BaseImpalaEvent.MILLIS_CONVERT_FACTOR);
-        ret.setAttribute(ATTRIBUTE_END_TIME, 
context.getLineageQuery().getEndTime() * BaseImpalaEvent.MILLIS_CONVERT_FACTOR);
+        // We are setting an empty value to these attributes, since now we 
have a new entity type called impala process
+        // execution which captures these values. We have to set empty values 
here because these attributes are
+        // mandatory attributes for impala process entity type.
+        ret.setAttribute(ATTRIBUTE_START_TIME, EMPTY_ATTRIBUTE_VALUE);
+        ret.setAttribute(ATTRIBUTE_END_TIME, EMPTY_ATTRIBUTE_VALUE);
+        ret.setAttribute(ATTRIBUTE_USER_NAME, EMPTY_ATTRIBUTE_VALUE);
+        ret.setAttribute(ATTRIBUTE_QUERY_TEXT, EMPTY_ATTRIBUTE_VALUE);
+        ret.setAttribute(ATTRIBUTE_QUERY_ID, EMPTY_ATTRIBUTE_VALUE);
+        ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
+        ret.setAttribute(ATTRIBUTE_RECENT_QUERIES, 
Collections.singletonList(queryStr));
+
+        return ret;
+    }
 
+    protected AtlasEntity getImpalaProcessExecutionEntity(AtlasEntity 
impalaProcess) throws Exception {
+        AtlasEntity ret         = new 
AtlasEntity(ImpalaDataType.IMPALA_PROCESS_EXECUTION.getName());
+        String      queryStr    = context.getQueryStr();
+
+        if (queryStr != null) {
+            queryStr = queryStr.toLowerCase().trim();
+        }
+
+        Long startTime = context.getLineageQuery().getTimestamp() * 
BaseImpalaEvent.MILLIS_CONVERT_FACTOR;
+        Long endTime = context.getLineageQuery().getEndTime() * 
BaseImpalaEvent.MILLIS_CONVERT_FACTOR;
+
+        ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
impalaProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString() +
+            QNAME_SEP_PROCESS + startTime.toString() +
+            QNAME_SEP_PROCESS + endTime.toString());
+        ret.setAttribute(ATTRIBUTE_NAME, queryStr + QNAME_SEP_PROCESS + 
startTime);
+        ret.setAttribute(ATTRIBUTE_START_TIME, startTime);
+        ret.setAttribute(ATTRIBUTE_END_TIME, endTime);
         ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
         ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
         ret.setAttribute(ATTRIBUTE_QUERY_ID, 
context.getLineageQuery().getQueryId());
         ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
-        ret.setAttribute(ATTRIBUTE_RECENT_QUERIES, 
Collections.singletonList(queryStr));
+        ret.setAttribute(ATTRIBUTE_HOSTNAME, context.getHostName());
+        ret.setRelationshipAttribute(ATTRIBUTE_PROCESS, 
AtlasTypeUtil.toAtlasRelatedObjectId(impalaProcess));
 
         return ret;
     }
diff --git 
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java
 
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java
index 3071576..b7506a4 100644
--- 
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java
+++ 
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java
@@ -110,15 +110,28 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
 
         if (!inputs.isEmpty() || !outputs.isEmpty()) {
             AtlasEntity process = getImpalaProcessEntity(inputs, outputs);
-            if (process!= null && LOG.isDebugEnabled()) {
-                LOG.debug("get process entity with qualifiedName: {}", 
process.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
-            }
+            if (process!= null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("get process entity with qualifiedName: {}",
+                        process.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                }
 
-            ret.addEntity(process);
+                ret.addEntity(process);
 
-            processColumnLineage(process, ret);
+                AtlasEntity processExecution = 
getImpalaProcessExecutionEntity(process);
+                if (processExecution != null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("get process executition entity with 
qualifiedName: {}",
+                            
processExecution.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                    }
+
+                    ret.addEntity(processExecution);
+                }
 
-            addProcessedEntities(ret);
+                processColumnLineage(process, ret);
+
+                addProcessedEntities(ret);
+            }
         } else {
             ret = null;
         }
@@ -154,8 +167,10 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
                 String outputColName = getQualifiedName(columnVertex);
                 AtlasEntity outputColumn = context.getEntity(outputColName);
 
-                LOG.debug("processColumnLineage(): target id = {}, target 
column name = {}",
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("processColumnLineage(): target id = {}, target 
column name = {}",
                         targetId, outputColName);
+                }
 
                 if (outputColumn == null) {
                     LOG.warn("column-lineage: non-existing output-column {}", 
outputColName);
@@ -219,7 +234,9 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
 
         for (AtlasEntity columnLineage : columnLineages) {
             String columnQualifiedName = 
(String)columnLineage.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
-            LOG.debug("get column lineage entity with qualifiedName: {}", 
columnQualifiedName);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("get column lineage entity with qualifiedName: {}", 
columnQualifiedName);
+            }
 
             entities.addEntity(columnLineage);
         }
diff --git 
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
index 0138d88..b8cbf6b 100644
--- 
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
+++ 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
@@ -19,6 +19,7 @@
 package org.apache.atlas.impala;
 
 import static 
org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUALIFIED_NAME;
+import static 
org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUERY_TEXT;
 import static 
org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_RECENT_QUERIES;
 import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.HIVE_TYPE_DB;
 import static org.testng.Assert.assertEquals;
@@ -26,6 +27,7 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.fail;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -33,8 +35,10 @@ import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClientV2;
 import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
 import org.apache.atlas.impala.hook.ImpalaLineageHook;
+import org.apache.atlas.impala.hook.events.BaseImpalaEvent;
 import org.apache.atlas.impala.model.ImpalaDataType;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.utils.AuthenticationUtil;
 import org.apache.atlas.utils.ParamChecker;
 import org.apache.commons.configuration.Configuration;
@@ -99,6 +103,7 @@ public class ImpalaLineageITBase {
 
     }
 
+    // return guid of the entity
     protected String assertEntityIsRegistered(final String typeName, final 
String property, final String value,
         final AssertPredicate assertPredicate) throws Exception {
         waitFor(80000, new Predicate() {
@@ -141,6 +146,25 @@ public class ImpalaLineageITBase {
         });
     }
 
+    protected String assertEntityIsRegisteredViaGuid(String guid,
+        final AssertPredicate assertPredicate) throws Exception {
+        waitFor(80000, new Predicate() {
+            @Override
+            public void evaluate() throws Exception {
+                AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = 
atlasClientV2.getEntityByGuid(guid);
+                AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
+                assertNotNull(entity);
+                if (assertPredicate != null) {
+                    assertPredicate.assertOnEntity(entity);
+                }
+
+            }
+        });
+        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = 
atlasClientV2.getEntityByGuid(guid);
+        AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
+        return (String) entity.getGuid();
+    }
+
 
     protected String assertProcessIsRegistered(List<String> processQFNames, 
String queryString) throws Exception {
         try {
@@ -185,6 +209,82 @@ public class ImpalaLineageITBase {
         }
     }
 
+    private String assertProcessExecutionIsRegistered(AtlasEntity 
impalaProcess, final String queryString) throws Exception {
+        try {
+            String guid = "";
+            List<AtlasObjectId> processExecutions = 
toAtlasObjectIdList(impalaProcess.getRelationshipAttribute(
+                BaseImpalaEvent.ATTRIBUTE_PROCESS_EXECUTIONS));
+            for (AtlasObjectId processExecution : processExecutions) {
+                AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = 
atlasClientV2.
+                    getEntityByGuid(processExecution.getGuid());
+
+                AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
+                if 
(String.valueOf(entity.getAttribute(ATTRIBUTE_QUERY_TEXT)).equals(queryString.toLowerCase().trim()))
 {
+                    guid = entity.getGuid();
+                    break;
+                }
+            }
+
+            return assertEntityIsRegisteredViaGuid(guid, new AssertPredicate() 
{
+                @Override
+                public void assertOnEntity(final AtlasEntity entity) throws 
Exception {
+                    String queryText = (String) 
entity.getAttribute(ATTRIBUTE_QUERY_TEXT);
+                    Assert.assertEquals(queryText, 
queryString.toLowerCase().trim());
+                }
+            });
+        } catch(Exception e) {
+            LOG.error("Exception : ", e);
+            throw e;
+        }
+    }
+
+    protected AtlasObjectId toAtlasObjectId(Object obj) {
+        final AtlasObjectId ret;
+
+        if (obj instanceof AtlasObjectId) {
+            ret = (AtlasObjectId) obj;
+        } else if (obj instanceof Map) {
+            ret = new AtlasObjectId((Map) obj);
+        } else if (obj != null) {
+            ret = new AtlasObjectId(obj.toString()); // guid
+        } else {
+            ret = null;
+        }
+
+        return ret;
+    }
+
+    protected List<AtlasObjectId> toAtlasObjectIdList(Object obj) {
+        final List<AtlasObjectId> ret;
+
+        if (obj instanceof Collection) {
+            Collection coll = (Collection) obj;
+
+            ret = new ArrayList<>(coll.size());
+
+            for (Object item : coll) {
+                AtlasObjectId objId = toAtlasObjectId(item);
+
+                if (objId != null) {
+                    ret.add(objId);
+                }
+            }
+        } else {
+            AtlasObjectId objId = toAtlasObjectId(obj);
+
+            if (objId != null) {
+                ret = new ArrayList<>(1);
+
+                ret.add(objId);
+            } else {
+                ret = null;
+            }
+        }
+
+        return ret;
+    }
+
+
     protected String assertDatabaseIsRegistered(String dbName) throws 
Exception {
         return assertDatabaseIsRegistered(dbName, null);
     }
@@ -227,6 +327,31 @@ public class ImpalaLineageITBase {
         return dbName + "." + tableName;
     }
 
+    protected AtlasEntity validateProcess(String processQFName, String 
queryString) throws Exception {
+        String      processId     = assertProcessIsRegistered(processQFName, 
queryString);
+        AtlasEntity processEntity = 
atlasClientV2.getEntityByGuid(processId).getEntity();
+
+        return processEntity;
+    }
+
+    protected AtlasEntity validateProcess(List<String> processQFNames, String 
queryString) throws Exception {
+        String      processId     = assertProcessIsRegistered(processQFNames, 
queryString);
+        AtlasEntity processEntity = 
atlasClientV2.getEntityByGuid(processId).getEntity();
+
+        return processEntity;
+    }
+
+    protected AtlasEntity validateProcessExecution(AtlasEntity impalaProcess, 
String queryString) throws Exception {
+        String      processExecutionId     = 
assertProcessExecutionIsRegistered(impalaProcess, queryString);
+        AtlasEntity processExecutionEntity = 
atlasClientV2.getEntityByGuid(processExecutionId).getEntity();
+        return processExecutionEntity;
+    }
+
+    protected int numberOfProcessExecutions(AtlasEntity impalaProcess) {
+        return toAtlasObjectIdList(impalaProcess.getRelationshipAttribute(
+            BaseImpalaEvent.ATTRIBUTE_PROCESS_EXECUTIONS)).size();
+    }
+
     public interface AssertPredicate {
         void assertOnEntity(AtlasEntity entity) throws Exception;
     }
diff --git 
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
index 033a518..7f9a534 100644
--- 
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
+++ 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
@@ -17,12 +17,17 @@
  */
 package org.apache.atlas.impala;
 
+import static 
org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUERY_TEXT;
+
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
 import org.apache.atlas.impala.hook.ImpalaLineageHook;
 import org.apache.atlas.impala.hook.events.BaseImpalaEvent;
 import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class ImpalaLineageToolIT extends ImpalaLineageITBase {
@@ -73,8 +78,13 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase 
{
 
             processQFName = processQFName.toLowerCase();
 
-            assertProcessIsRegistered(processQFName,
-                "create view db_1.view_1 as select count, id from 
db_1.table_1");
+            String queryString = "create view db_1.view_1 as select count, id 
from db_1.table_1";
+            AtlasEntity processEntity1 = validateProcess(processQFName, 
queryString);
+            AtlasEntity processExecutionEntity1 = 
validateProcessExecution(processEntity1, queryString);
+            AtlasObjectId process1 = 
toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
+                BaseImpalaEvent.ATTRIBUTE_PROCESS));
+            Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
+            Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
 
         } catch (Exception e) {
             System.out.print("Appending file error");
@@ -136,8 +146,13 @@ public class ImpalaLineageToolIT extends 
ImpalaLineageITBase {
             // verify the process is saved in Atlas. the value is from info in 
IMPALA_4.
             // There is no createTime in lineage record, so we don't know the 
process qualified name
             // And can only verify the process is created for the given query.
-            assertProcessIsRegistered(processQFNames,"create view " + dbName + 
"." + targetTableName + " as select count, id from " + dbName + "." + 
sourceTableName);
-
+            String queryString = "create view " + dbName + "." + 
targetTableName + " as select count, id from " + dbName + "." + sourceTableName;
+            AtlasEntity processEntity1 = validateProcess(processQFNames, 
queryString);
+            AtlasEntity processExecutionEntity1 = 
validateProcessExecution(processEntity1, queryString);
+            AtlasObjectId process1 = 
toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
+                BaseImpalaEvent.ATTRIBUTE_PROCESS));
+            Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
+            Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
         } catch (Exception e) {
             System.out.print("Appending file error");
         }
@@ -183,8 +198,13 @@ public class ImpalaLineageToolIT extends 
ImpalaLineageITBase {
 
         processQFName = processQFName.toLowerCase();
 
-        assertProcessIsRegistered(processQFName,
-            "create table " + dbName + "." + targetTableName + " as select 
count, id from " + dbName + "." + sourceTableName);
+        String queryString = "create table " + dbName + "." + targetTableName 
+ " as select count, id from " + dbName + "." + sourceTableName;
+        AtlasEntity processEntity1 = validateProcess(processQFName, 
queryString);
+        AtlasEntity processExecutionEntity1 = 
validateProcessExecution(processEntity1, queryString);
+        AtlasObjectId process1 = 
toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
+            BaseImpalaEvent.ATTRIBUTE_PROCESS));
+        Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
+        Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
     }
 
     /**
@@ -227,8 +247,13 @@ public class ImpalaLineageToolIT extends 
ImpalaLineageITBase {
 
         processQFName = processQFName.toLowerCase();
 
-        assertProcessIsRegistered(processQFName,
-            "alter view " + dbName + "." + targetTableName + " as select 
count, id from " + dbName + "." + sourceTableName);
+        String queryString = "alter view " + dbName + "." + targetTableName + 
" as select count, id from " + dbName + "." + sourceTableName;
+        AtlasEntity processEntity1 = validateProcess(processQFName, 
queryString);
+        AtlasEntity processExecutionEntity1 = 
validateProcessExecution(processEntity1, queryString);
+        AtlasObjectId process1 = 
toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
+            BaseImpalaEvent.ATTRIBUTE_PROCESS));
+        Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
+        Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
     }
 
     /**
@@ -272,7 +297,79 @@ public class ImpalaLineageToolIT extends 
ImpalaLineageITBase {
             CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + 
createTime2;
         String processQFName = "QUERY:" + sourceQFName.toLowerCase() + 
"->:INSERT:" + targetQFName.toLowerCase();
 
-        assertProcessIsRegistered(processQFName,
-            "insert into table " + dbName + "." + targetTableName + " (count, 
id) select count, id from " + dbName + "." + sourceTableName);
+        String queryString = "insert into table " + dbName + "." + 
targetTableName + " (count, id) select count, id from " + dbName + "." + 
sourceTableName;
+        AtlasEntity processEntity1 = validateProcess(processQFName, 
queryString);
+        AtlasEntity processExecutionEntity1 = 
validateProcessExecution(processEntity1, queryString);
+        AtlasObjectId process1 = 
toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
+            BaseImpalaEvent.ATTRIBUTE_PROCESS));
+        Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
+        Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
+    }
+
+    /**
+     * This tests
+     * 1) ImpalaLineageTool can parse one lineage file that contains multiple 
"insert into" command lineages,
+     *    there is table vertex with createTime.
+     * 2) Lineage is sent to Atlas
+     * 3) Atlas can get these lineages from Atlas
+     */
+    @Test
+    public void testMultipleInsertIntoAsSelectFromFile() throws Exception {
+        String IMPALA = dir + "impalaMultipleInsertIntoAsSelect1.json";
+        String IMPALA_WAL = dir + "WALimpala.wal";
+
+        ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
+
+        // create database and tables to simulate Impala behavior that Impala 
updates metadata
+        // to HMS and HMSHook sends the metadata to Atlas, which has to happen 
before
+        // Atlas can handle lineage notification
+        String dbName = "db_6";
+        createDatabase(dbName);
+
+        String sourceTableName = "table_1";
+        createTable(dbName, sourceTableName,"(id string, count int)", false);
+
+        String targetTableName = "table_2";
+        createTable(dbName, targetTableName,"(count int, id string, int_col 
int)", false);
+
+        // process lineage record, and send corresponding notification to Atlas
+        String[] args = new String[]{"-d", "./", "-p", "impala"};
+        ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
+        toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, 
IMPALA_WAL);
+
+        // re-run the same lineage record, should have the same process entity 
and another process execution entity
+        Thread.sleep(500);
+        IMPALA = dir + "impalaMultipleInsertIntoAsSelect2.json";
+        toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA, 
IMPALA_WAL);
+
+        // verify the process is saved in Atlas
+        // the value is from info in IMPALA_4.
+        String createTime1 = new 
Long(TABLE_CREATE_TIME_SOURCE*1000).toString();
+        String createTime2 = new Long(TABLE_CREATE_TIME*1000).toString();
+        String sourceQFName = dbName + "." + sourceTableName + 
AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+            CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + 
createTime1;
+        String targetQFName = dbName + "." + targetTableName + 
AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+            CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + 
createTime2;
+        String processQFName = "QUERY:" + sourceQFName.toLowerCase() + 
"->:INSERT:" + targetQFName.toLowerCase();
+
+        String queryString = "insert into table " + dbName + "." + 
targetTableName + " (count, id) select count, id from " + dbName + "." + 
sourceTableName;
+        queryString = queryString.toLowerCase().trim();
+        String queryString2 = queryString;
+        AtlasEntity processEntity1 = validateProcess(processQFName, 
queryString);
+
+        List<AtlasObjectId> processExecutions = 
toAtlasObjectIdList(processEntity1.getRelationshipAttribute(
+            BaseImpalaEvent.ATTRIBUTE_PROCESS_EXECUTIONS));
+        Assert.assertEquals(processExecutions.size(), 2);
+        for (AtlasObjectId processExecutionId : processExecutions) {
+            AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = 
atlasClientV2.
+                getEntityByGuid(processExecutionId.getGuid());
+
+            AtlasEntity processExecutionEntity = 
atlasEntityWithExtInfo.getEntity();
+            String entityQueryText = 
String.valueOf(processExecutionEntity.getAttribute(ATTRIBUTE_QUERY_TEXT)).toLowerCase().trim();
+            if (!(queryString.equalsIgnoreCase(entityQueryText) || 
queryString2.equalsIgnoreCase(entityQueryText))) {
+                String errorMessage = String.format("process query text '%s' 
does not match expected value of '%s' or '%s'", entityQueryText, queryString, 
queryString2);
+                Assert.assertTrue(false, errorMessage);
+            }
+        }
     }
 }
\ No newline at end of file
diff --git 
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
index 86801e3..6156208 100644
--- 
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
+++ 
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
@@ -28,6 +28,9 @@ import org.apache.atlas.impala.model.ImpalaVertexType;
 import org.apache.atlas.impala.model.LineageEdge;
 import org.apache.atlas.impala.model.ImpalaQuery;
 import org.apache.atlas.impala.model.LineageVertex;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
@@ -135,7 +138,12 @@ public class ImpalaLineageHookIT extends 
ImpalaLineageITBase {
 
             processQFName = processQFName.toLowerCase();
 
-            assertProcessIsRegistered(processQFName, queryObj.getQueryText());
+            AtlasEntity processEntity1 = validateProcess(processQFName, 
queryObj.getQueryText());
+            AtlasEntity processExecutionEntity1 = 
validateProcessExecution(processEntity1, queryObj.getQueryText());
+            AtlasObjectId process1 = 
toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
+                BaseImpalaEvent.ATTRIBUTE_PROCESS));
+            Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
+            Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
         } catch (Exception ex) {
             LOG.error("process create_view failed: ", ex);
             assertFalse(true);
diff --git 
a/addons/impala-bridge/src/test/resources/impalaMultipleInsertIntoAsSelect1.json
 
b/addons/impala-bridge/src/test/resources/impalaMultipleInsertIntoAsSelect1.json
new file mode 100644
index 0000000..4e27837
--- /dev/null
+++ 
b/addons/impala-bridge/src/test/resources/impalaMultipleInsertIntoAsSelect1.json
@@ -0,0 +1,83 @@
+{
+  "queryText":"insert into table db_6.table_2 (count, id) select count, id 
from db_6.table_1",
+  "queryId":"3a441d0c130962f8:7f634aec00000000",
+  "hash":"64ff0425ccdfaada53e3f2fd76f566f7",
+  "user":"admin",
+  "timestamp":1554750072,
+  "endTime":1554750554,
+  "edges":[
+    {
+      "sources":[
+        1
+      ],
+      "targets":[
+        0
+      ],
+      "edgeType":"PROJECTION"
+    },
+    {
+      "sources":[
+        3
+      ],
+      "targets":[
+        2
+      ],
+      "edgeType":"PROJECTION"
+    },
+    {
+      "sources":[
+      ],
+      "targets":[
+        4
+      ],
+      "edgeType":"PROJECTION"
+    }
+  ],
+  "vertices":[
+    {
+      "id":0,
+      "vertexType":"COLUMN",
+      "vertexId":"db_6.table_2.count",
+      "metadata": {
+        "tableName": "db_6.table_2",
+        "tableCreateTime": 1554750072
+      }
+    },
+    {
+      "id":1,
+      "vertexType":"COLUMN",
+      "vertexId":"db_6.table_1.count",
+      "metadata": {
+        "tableName": "db_6.table_1",
+        "tableCreateTime": 1554750070
+      }
+    },
+    {
+      "id":2,
+      "vertexType":"COLUMN",
+      "vertexId":"db_6.table_2.id",
+      "metadata": {
+        "tableName": "db_6.table_2",
+        "tableCreateTime": 1554750072
+      }
+    },
+    {
+      "id":3,
+      "vertexType":"COLUMN",
+      "vertexId":"db_6.table_1.id",
+      "metadata": {
+        "tableName": "db_6.table_1",
+        "tableCreateTime": 1554750070
+      }
+    },
+    {
+      "id":4,
+      "vertexType":"COLUMN",
+      "vertexId":"db_6.table_2.int_col",
+      "metadata": {
+        "tableName": "db_6.table_2",
+        "tableCreateTime": 1554750072
+      }
+    }
+  ]
+}
diff --git 
a/addons/impala-bridge/src/test/resources/impalaMultipleInsertIntoAsSelect2.json
 
b/addons/impala-bridge/src/test/resources/impalaMultipleInsertIntoAsSelect2.json
new file mode 100644
index 0000000..ece6535
--- /dev/null
+++ 
b/addons/impala-bridge/src/test/resources/impalaMultipleInsertIntoAsSelect2.json
@@ -0,0 +1,83 @@
+{
+  "queryText":"insert into table db_6.table_2 (count, id) select count, id 
from db_6.table_1",
+  "queryId":"3a441d0c130962f8:7f634aec00000000",
+  "hash":"64ff0425ccdfaada53e3f2fd76f566f7",
+  "user":"admin",
+  "timestamp":1554750082,
+  "endTime":1554750584,
+  "edges":[
+    {
+      "sources":[
+        1
+      ],
+      "targets":[
+        0
+      ],
+      "edgeType":"PROJECTION"
+    },
+    {
+      "sources":[
+        3
+      ],
+      "targets":[
+        2
+      ],
+      "edgeType":"PROJECTION"
+    },
+    {
+      "sources":[
+      ],
+      "targets":[
+        4
+      ],
+      "edgeType":"PROJECTION"
+    }
+  ],
+  "vertices":[
+    {
+      "id":0,
+      "vertexType":"COLUMN",
+      "vertexId":"db_6.table_2.count",
+      "metadata": {
+        "tableName": "db_6.table_2",
+        "tableCreateTime": 1554750072
+      }
+    },
+    {
+      "id":1,
+      "vertexType":"COLUMN",
+      "vertexId":"db_6.table_1.count",
+      "metadata": {
+        "tableName": "db_6.table_1",
+        "tableCreateTime": 1554750070
+      }
+    },
+    {
+      "id":2,
+      "vertexType":"COLUMN",
+      "vertexId":"db_6.table_2.id",
+      "metadata": {
+        "tableName": "db_6.table_2",
+        "tableCreateTime": 1554750072
+      }
+    },
+    {
+      "id":3,
+      "vertexType":"COLUMN",
+      "vertexId":"db_6.table_1.id",
+      "metadata": {
+        "tableName": "db_6.table_1",
+        "tableCreateTime": 1554750070
+      }
+    },
+    {
+      "id":4,
+      "vertexType":"COLUMN",
+      "vertexId":"db_6.table_2.int_col",
+      "metadata": {
+        "tableName": "db_6.table_2",
+        "tableCreateTime": 1554750072
+      }
+    }
+  ]
+}
\ No newline at end of file

Reply via email to