This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 60b44a9 ATLAS-3185: Create process execution for Impala integration
60b44a9 is described below
commit 60b44a93c4033991d160b8d4ac1af7723b10d564
Author: lina.li <[email protected]>
AuthorDate: Wed May 29 12:46:09 2019 -0700
ATLAS-3185: Create process execution for Impala integration
Signed-off-by: Sarath Subramanian <[email protected]>
---
.../org/apache/atlas/impala/ImpalaLineageTool.java | 1 -
.../atlas/impala/hook/AtlasImpalaHookContext.java | 6 +-
.../atlas/impala/hook/ImpalaIdentifierParser.java | 3 -
.../atlas/impala/hook/ImpalaLineageHook.java | 18 ++-
.../atlas/impala/hook/events/BaseImpalaEvent.java | 44 +++++++-
.../impala/hook/events/CreateImpalaProcess.java | 33 ++++--
.../apache/atlas/impala/ImpalaLineageITBase.java | 125 +++++++++++++++++++++
.../apache/atlas/impala/ImpalaLineageToolIT.java | 117 +++++++++++++++++--
.../atlas/impala/hook/ImpalaLineageHookIT.java | 10 +-
.../impalaMultipleInsertIntoAsSelect1.json | 83 ++++++++++++++
.../impalaMultipleInsertIntoAsSelect2.json | 83 ++++++++++++++
11 files changed, 493 insertions(+), 30 deletions(-)
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java
index 7c9abc8..6e6d6f1 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java
@@ -18,7 +18,6 @@
package org.apache.atlas.impala;
-import java.lang.Runnable;
import org.apache.atlas.impala.hook.ImpalaLineageHook;
import java.io.*;
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
index 88faace..5b5f05a 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
@@ -24,7 +24,7 @@ import java.util.Map;
import org.apache.atlas.impala.model.ImpalaOperationType;
import org.apache.atlas.impala.model.ImpalaQuery;
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.commons.lang.StringUtils;
+
/**
* Contain the info related to an linear record from Impala
@@ -70,6 +70,10 @@ public class AtlasImpalaHookContext {
return hook.getClusterName();
}
+ public String getHostName() {
+ return hook.getHostName();
+ }
+
public boolean isConvertHdfsPathToLowerCase() {
return hook.isConvertHdfsPathToLowerCase();
}
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java
index b9d6cbb..33e44f7 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java
@@ -19,11 +19,8 @@
package org.apache.atlas.impala.hook;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
index 232a569..6d65ae0 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
@@ -19,7 +19,8 @@
package org.apache.atlas.impala.hook;
import static org.apache.atlas.AtlasConstants.DEFAULT_CLUSTER_NAME;
-
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import com.google.common.collect.Sets;
import java.io.IOException;
import org.apache.atlas.hook.AtlasHook;
@@ -44,15 +45,24 @@ public class ImpalaLineageHook extends AtlasHook {
public static final String CONF_CLUSTER_NAME =
"atlas.cluster.name";
public static final String CONF_REALM_NAME =
"atlas.realm.name";
public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE =
CONF_PREFIX + "hdfs_path.convert_to_lowercase";
+ public static final String DEFAULT_HOST_NAME =
"localhost";
private static final String clusterName;
- private static final String realm;
+ private static final String realm;
private static final boolean convertHdfsPathToLowerCase;
+ private static String hostName;
static {
clusterName =
atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
realm =
atlasProperties.getString(CONF_REALM_NAME, DEFAULT_CLUSTER_NAME); // what
should default be ??
convertHdfsPathToLowerCase =
atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
+
+ try {
+ hostName = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ LOG.warn("No hostname found. Setting the hostname to default value
{}", DEFAULT_HOST_NAME, e);
+ hostName = DEFAULT_HOST_NAME;
+ }
}
public ImpalaLineageHook() {
@@ -122,6 +132,10 @@ public class ImpalaLineageHook extends AtlasHook {
}
}
+ public String getHostName() {
+ return hostName;
+ }
+
private UserGroupInformation getUgiFromUserName(String userName) throws
IOException {
String userPrincipal = userName.contains(REALM_SEPARATOR)? userName :
userName + "@" + getRealm();
Subject userSubject = new Subject(false, Sets.newHashSet(
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
index c7604ba..d241b6a 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
import org.apache.atlas.impala.hook.ImpalaOperationParser;
import org.apache.atlas.impala.model.ImpalaDataType;
@@ -43,6 +44,8 @@ import
org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.type.AtlasTypeUtil;
+
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,12 +78,17 @@ public abstract class BaseImpalaEvent {
public static final String ATTRIBUTE_START_TIME =
"startTime";
public static final String ATTRIBUTE_USER_NAME =
"userName";
public static final String ATTRIBUTE_QUERY_TEXT =
"queryText";
+ public static final String ATTRIBUTE_PROCESS = "process";
+ public static final String ATTRIBUTE_PROCESS_EXECUTIONS =
"processExecutions";
public static final String ATTRIBUTE_QUERY_ID = "queryId";
public static final String ATTRIBUTE_QUERY_PLAN =
"queryPlan";
public static final String ATTRIBUTE_END_TIME = "endTime";
public static final String ATTRIBUTE_RECENT_QUERIES =
"recentQueries";
public static final String ATTRIBUTE_QUERY = "query";
public static final String ATTRIBUTE_DEPENDENCY_TYPE =
"dependencyType";
+ public static final String ATTRIBUTE_HOSTNAME =
"hostName";
+ public static final String EMPTY_ATTRIBUTE_VALUE = "";
+
public static final long MILLIS_CONVERT_FACTOR = 1000;
@@ -525,15 +533,43 @@ public abstract class BaseImpalaEvent {
ret.setAttribute(ATTRIBUTE_NAME, queryStr);
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE,
context.getImpalaOperationType());
- // the unit of timestamp from lineage record is in seconds. Convert to
milliseconds to Atlas
- ret.setAttribute(ATTRIBUTE_START_TIME,
context.getLineageQuery().getTimestamp() *
BaseImpalaEvent.MILLIS_CONVERT_FACTOR);
- ret.setAttribute(ATTRIBUTE_END_TIME,
context.getLineageQuery().getEndTime() * BaseImpalaEvent.MILLIS_CONVERT_FACTOR);
+ // We are setting an empty value to these attributes, since now we
have a new entity type called impala process
+ // execution which captures these values. We have to set empty values
here because these attributes are
+ // mandatory attributes for impala process entity type.
+ ret.setAttribute(ATTRIBUTE_START_TIME, EMPTY_ATTRIBUTE_VALUE);
+ ret.setAttribute(ATTRIBUTE_END_TIME, EMPTY_ATTRIBUTE_VALUE);
+ ret.setAttribute(ATTRIBUTE_USER_NAME, EMPTY_ATTRIBUTE_VALUE);
+ ret.setAttribute(ATTRIBUTE_QUERY_TEXT, EMPTY_ATTRIBUTE_VALUE);
+ ret.setAttribute(ATTRIBUTE_QUERY_ID, EMPTY_ATTRIBUTE_VALUE);
+ ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
+ ret.setAttribute(ATTRIBUTE_RECENT_QUERIES,
Collections.singletonList(queryStr));
+
+ return ret;
+ }
+ protected AtlasEntity getImpalaProcessExecutionEntity(AtlasEntity
impalaProcess) throws Exception {
+ AtlasEntity ret = new
AtlasEntity(ImpalaDataType.IMPALA_PROCESS_EXECUTION.getName());
+ String queryStr = context.getQueryStr();
+
+ if (queryStr != null) {
+ queryStr = queryStr.toLowerCase().trim();
+ }
+
+ Long startTime = context.getLineageQuery().getTimestamp() *
BaseImpalaEvent.MILLIS_CONVERT_FACTOR;
+ Long endTime = context.getLineageQuery().getEndTime() *
BaseImpalaEvent.MILLIS_CONVERT_FACTOR;
+
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
impalaProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString() +
+ QNAME_SEP_PROCESS + startTime.toString() +
+ QNAME_SEP_PROCESS + endTime.toString());
+ ret.setAttribute(ATTRIBUTE_NAME, queryStr + QNAME_SEP_PROCESS +
startTime);
+ ret.setAttribute(ATTRIBUTE_START_TIME, startTime);
+ ret.setAttribute(ATTRIBUTE_END_TIME, endTime);
ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr);
ret.setAttribute(ATTRIBUTE_QUERY_ID,
context.getLineageQuery().getQueryId());
ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported");
- ret.setAttribute(ATTRIBUTE_RECENT_QUERIES,
Collections.singletonList(queryStr));
+ ret.setAttribute(ATTRIBUTE_HOSTNAME, context.getHostName());
+ ret.setRelationshipAttribute(ATTRIBUTE_PROCESS,
AtlasTypeUtil.toAtlasRelatedObjectId(impalaProcess));
return ret;
}
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java
index 3071576..b7506a4 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java
@@ -110,15 +110,28 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
if (!inputs.isEmpty() || !outputs.isEmpty()) {
AtlasEntity process = getImpalaProcessEntity(inputs, outputs);
- if (process!= null && LOG.isDebugEnabled()) {
- LOG.debug("get process entity with qualifiedName: {}",
process.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
- }
+ if (process!= null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get process entity with qualifiedName: {}",
+ process.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
- ret.addEntity(process);
+ ret.addEntity(process);
- processColumnLineage(process, ret);
+ AtlasEntity processExecution =
getImpalaProcessExecutionEntity(process);
+ if (processExecution != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get process executition entity with
qualifiedName: {}",
+
processExecution.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ ret.addEntity(processExecution);
+ }
- addProcessedEntities(ret);
+ processColumnLineage(process, ret);
+
+ addProcessedEntities(ret);
+ }
} else {
ret = null;
}
@@ -154,8 +167,10 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
String outputColName = getQualifiedName(columnVertex);
AtlasEntity outputColumn = context.getEntity(outputColName);
- LOG.debug("processColumnLineage(): target id = {}, target
column name = {}",
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("processColumnLineage(): target id = {}, target
column name = {}",
targetId, outputColName);
+ }
if (outputColumn == null) {
LOG.warn("column-lineage: non-existing output-column {}",
outputColName);
@@ -219,7 +234,9 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
for (AtlasEntity columnLineage : columnLineages) {
String columnQualifiedName =
(String)columnLineage.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
- LOG.debug("get column lineage entity with qualifiedName: {}",
columnQualifiedName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get column lineage entity with qualifiedName: {}",
columnQualifiedName);
+ }
entities.addEntity(columnLineage);
}
diff --git
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
index 0138d88..b8cbf6b 100644
---
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
+++
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
@@ -19,6 +19,7 @@
package org.apache.atlas.impala;
import static
org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUALIFIED_NAME;
+import static
org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUERY_TEXT;
import static
org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_RECENT_QUERIES;
import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.HIVE_TYPE_DB;
import static org.testng.Assert.assertEquals;
@@ -26,6 +27,7 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -33,8 +35,10 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
import org.apache.atlas.impala.hook.ImpalaLineageHook;
+import org.apache.atlas.impala.hook.events.BaseImpalaEvent;
import org.apache.atlas.impala.model.ImpalaDataType;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration;
@@ -99,6 +103,7 @@ public class ImpalaLineageITBase {
}
+ // return guid of the entity
protected String assertEntityIsRegistered(final String typeName, final
String property, final String value,
final AssertPredicate assertPredicate) throws Exception {
waitFor(80000, new Predicate() {
@@ -141,6 +146,25 @@ public class ImpalaLineageITBase {
});
}
+ protected String assertEntityIsRegisteredViaGuid(String guid,
+ final AssertPredicate assertPredicate) throws Exception {
+ waitFor(80000, new Predicate() {
+ @Override
+ public void evaluate() throws Exception {
+ AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo =
atlasClientV2.getEntityByGuid(guid);
+ AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
+ assertNotNull(entity);
+ if (assertPredicate != null) {
+ assertPredicate.assertOnEntity(entity);
+ }
+
+ }
+ });
+ AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo =
atlasClientV2.getEntityByGuid(guid);
+ AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
+ return (String) entity.getGuid();
+ }
+
protected String assertProcessIsRegistered(List<String> processQFNames,
String queryString) throws Exception {
try {
@@ -185,6 +209,82 @@ public class ImpalaLineageITBase {
}
}
+ private String assertProcessExecutionIsRegistered(AtlasEntity
impalaProcess, final String queryString) throws Exception {
+ try {
+ String guid = "";
+ List<AtlasObjectId> processExecutions =
toAtlasObjectIdList(impalaProcess.getRelationshipAttribute(
+ BaseImpalaEvent.ATTRIBUTE_PROCESS_EXECUTIONS));
+ for (AtlasObjectId processExecution : processExecutions) {
+ AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo =
atlasClientV2.
+ getEntityByGuid(processExecution.getGuid());
+
+ AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
+ if
(String.valueOf(entity.getAttribute(ATTRIBUTE_QUERY_TEXT)).equals(queryString.toLowerCase().trim()))
{
+ guid = entity.getGuid();
+ break;
+ }
+ }
+
+ return assertEntityIsRegisteredViaGuid(guid, new AssertPredicate()
{
+ @Override
+ public void assertOnEntity(final AtlasEntity entity) throws
Exception {
+ String queryText = (String)
entity.getAttribute(ATTRIBUTE_QUERY_TEXT);
+ Assert.assertEquals(queryText,
queryString.toLowerCase().trim());
+ }
+ });
+ } catch(Exception e) {
+ LOG.error("Exception : ", e);
+ throw e;
+ }
+ }
+
+ protected AtlasObjectId toAtlasObjectId(Object obj) {
+ final AtlasObjectId ret;
+
+ if (obj instanceof AtlasObjectId) {
+ ret = (AtlasObjectId) obj;
+ } else if (obj instanceof Map) {
+ ret = new AtlasObjectId((Map) obj);
+ } else if (obj != null) {
+ ret = new AtlasObjectId(obj.toString()); // guid
+ } else {
+ ret = null;
+ }
+
+ return ret;
+ }
+
+ protected List<AtlasObjectId> toAtlasObjectIdList(Object obj) {
+ final List<AtlasObjectId> ret;
+
+ if (obj instanceof Collection) {
+ Collection coll = (Collection) obj;
+
+ ret = new ArrayList<>(coll.size());
+
+ for (Object item : coll) {
+ AtlasObjectId objId = toAtlasObjectId(item);
+
+ if (objId != null) {
+ ret.add(objId);
+ }
+ }
+ } else {
+ AtlasObjectId objId = toAtlasObjectId(obj);
+
+ if (objId != null) {
+ ret = new ArrayList<>(1);
+
+ ret.add(objId);
+ } else {
+ ret = null;
+ }
+ }
+
+ return ret;
+ }
+
+
protected String assertDatabaseIsRegistered(String dbName) throws
Exception {
return assertDatabaseIsRegistered(dbName, null);
}
@@ -227,6 +327,31 @@ public class ImpalaLineageITBase {
return dbName + "." + tableName;
}
+ protected AtlasEntity validateProcess(String processQFName, String
queryString) throws Exception {
+ String processId = assertProcessIsRegistered(processQFName,
queryString);
+ AtlasEntity processEntity =
atlasClientV2.getEntityByGuid(processId).getEntity();
+
+ return processEntity;
+ }
+
+ protected AtlasEntity validateProcess(List<String> processQFNames, String
queryString) throws Exception {
+ String processId = assertProcessIsRegistered(processQFNames,
queryString);
+ AtlasEntity processEntity =
atlasClientV2.getEntityByGuid(processId).getEntity();
+
+ return processEntity;
+ }
+
+ protected AtlasEntity validateProcessExecution(AtlasEntity impalaProcess,
String queryString) throws Exception {
+ String processExecutionId =
assertProcessExecutionIsRegistered(impalaProcess, queryString);
+ AtlasEntity processExecutionEntity =
atlasClientV2.getEntityByGuid(processExecutionId).getEntity();
+ return processExecutionEntity;
+ }
+
+ protected int numberOfProcessExecutions(AtlasEntity impalaProcess) {
+ return toAtlasObjectIdList(impalaProcess.getRelationshipAttribute(
+ BaseImpalaEvent.ATTRIBUTE_PROCESS_EXECUTIONS)).size();
+ }
+
public interface AssertPredicate {
void assertOnEntity(AtlasEntity entity) throws Exception;
}
diff --git
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
index 033a518..7f9a534 100644
---
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
+++
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
@@ -17,12 +17,17 @@
*/
package org.apache.atlas.impala;
+import static
org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUERY_TEXT;
+
import java.util.ArrayList;
import java.util.List;
import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
import org.apache.atlas.impala.hook.ImpalaLineageHook;
import org.apache.atlas.impala.hook.events.BaseImpalaEvent;
import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.testng.Assert;
import org.testng.annotations.Test;
public class ImpalaLineageToolIT extends ImpalaLineageITBase {
@@ -73,8 +78,13 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase
{
processQFName = processQFName.toLowerCase();
- assertProcessIsRegistered(processQFName,
- "create view db_1.view_1 as select count, id from
db_1.table_1");
+ String queryString = "create view db_1.view_1 as select count, id
from db_1.table_1";
+ AtlasEntity processEntity1 = validateProcess(processQFName,
queryString);
+ AtlasEntity processExecutionEntity1 =
validateProcessExecution(processEntity1, queryString);
+ AtlasObjectId process1 =
toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
+ BaseImpalaEvent.ATTRIBUTE_PROCESS));
+ Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
+ Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
} catch (Exception e) {
System.out.print("Appending file error");
@@ -136,8 +146,13 @@ public class ImpalaLineageToolIT extends
ImpalaLineageITBase {
// verify the process is saved in Atlas. the value is from info in
IMPALA_4.
// There is no createTime in lineage record, so we don't know the
process qualified name
// And can only verify the process is created for the given query.
- assertProcessIsRegistered(processQFNames,"create view " + dbName +
"." + targetTableName + " as select count, id from " + dbName + "." +
sourceTableName);
-
+ String queryString = "create view " + dbName + "." +
targetTableName + " as select count, id from " + dbName + "." + sourceTableName;
+ AtlasEntity processEntity1 = validateProcess(processQFNames,
queryString);
+ AtlasEntity processExecutionEntity1 =
validateProcessExecution(processEntity1, queryString);
+ AtlasObjectId process1 =
toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
+ BaseImpalaEvent.ATTRIBUTE_PROCESS));
+ Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
+ Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
} catch (Exception e) {
System.out.print("Appending file error");
}
@@ -183,8 +198,13 @@ public class ImpalaLineageToolIT extends
ImpalaLineageITBase {
processQFName = processQFName.toLowerCase();
- assertProcessIsRegistered(processQFName,
- "create table " + dbName + "." + targetTableName + " as select
count, id from " + dbName + "." + sourceTableName);
+ String queryString = "create table " + dbName + "." + targetTableName
+ " as select count, id from " + dbName + "." + sourceTableName;
+ AtlasEntity processEntity1 = validateProcess(processQFName,
queryString);
+ AtlasEntity processExecutionEntity1 =
validateProcessExecution(processEntity1, queryString);
+ AtlasObjectId process1 =
toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
+ BaseImpalaEvent.ATTRIBUTE_PROCESS));
+ Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
+ Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
}
/**
@@ -227,8 +247,13 @@ public class ImpalaLineageToolIT extends
ImpalaLineageITBase {
processQFName = processQFName.toLowerCase();
- assertProcessIsRegistered(processQFName,
- "alter view " + dbName + "." + targetTableName + " as select
count, id from " + dbName + "." + sourceTableName);
+ String queryString = "alter view " + dbName + "." + targetTableName +
" as select count, id from " + dbName + "." + sourceTableName;
+ AtlasEntity processEntity1 = validateProcess(processQFName,
queryString);
+ AtlasEntity processExecutionEntity1 =
validateProcessExecution(processEntity1, queryString);
+ AtlasObjectId process1 =
toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
+ BaseImpalaEvent.ATTRIBUTE_PROCESS));
+ Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
+ Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
}
/**
@@ -272,7 +297,79 @@ public class ImpalaLineageToolIT extends
ImpalaLineageITBase {
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS +
createTime2;
String processQFName = "QUERY:" + sourceQFName.toLowerCase() +
"->:INSERT:" + targetQFName.toLowerCase();
- assertProcessIsRegistered(processQFName,
- "insert into table " + dbName + "." + targetTableName + " (count,
id) select count, id from " + dbName + "." + sourceTableName);
+ String queryString = "insert into table " + dbName + "." +
targetTableName + " (count, id) select count, id from " + dbName + "." +
sourceTableName;
+ AtlasEntity processEntity1 = validateProcess(processQFName,
queryString);
+ AtlasEntity processExecutionEntity1 =
validateProcessExecution(processEntity1, queryString);
+ AtlasObjectId process1 =
toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
+ BaseImpalaEvent.ATTRIBUTE_PROCESS));
+ Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
+ Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
+ }
+
+ /**
+ * This tests
+ * 1) ImpalaLineageTool can parse one lineage file that contains multiple
"insert into" command lineages,
+ * there is table vertex with createTime.
+ * 2) Lineage is sent to Atlas
+ * 3) Atlas can get these lineages from Atlas
+ */
+ @Test
+ public void testMultipleInsertIntoAsSelectFromFile() throws Exception {
+ String IMPALA = dir + "impalaMultipleInsertIntoAsSelect1.json";
+ String IMPALA_WAL = dir + "WALimpala.wal";
+
+ ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
+
+ // create database and tables to simulate Impala behavior that Impala
updates metadata
+ // to HMS and HMSHook sends the metadata to Atlas, which has to happen
before
+ // Atlas can handle lineage notification
+ String dbName = "db_6";
+ createDatabase(dbName);
+
+ String sourceTableName = "table_1";
+ createTable(dbName, sourceTableName,"(id string, count int)", false);
+
+ String targetTableName = "table_2";
+ createTable(dbName, targetTableName,"(count int, id string, int_col
int)", false);
+
+ // process lineage record, and send corresponding notification to Atlas
+ String[] args = new String[]{"-d", "./", "-p", "impala"};
+ ImpalaLineageTool toolInstance = new ImpalaLineageTool(args);
+ toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA,
IMPALA_WAL);
+
+ // re-run the same lineage record, should have the same process entity
and another process execution entity
+ Thread.sleep(500);
+ IMPALA = dir + "impalaMultipleInsertIntoAsSelect2.json";
+ toolInstance.importHImpalaEntities(impalaLineageHook, IMPALA,
IMPALA_WAL);
+
+ // verify the process is saved in Atlas
+ // the value is from info in IMPALA_4.
+ String createTime1 = new
Long(TABLE_CREATE_TIME_SOURCE*1000).toString();
+ String createTime2 = new Long(TABLE_CREATE_TIME*1000).toString();
+ String sourceQFName = dbName + "." + sourceTableName +
AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS +
createTime1;
+ String targetQFName = dbName + "." + targetTableName +
AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS +
createTime2;
+ String processQFName = "QUERY:" + sourceQFName.toLowerCase() +
"->:INSERT:" + targetQFName.toLowerCase();
+
+ String queryString = "insert into table " + dbName + "." +
targetTableName + " (count, id) select count, id from " + dbName + "." +
sourceTableName;
+ queryString = queryString.toLowerCase().trim();
+ String queryString2 = queryString;
+ AtlasEntity processEntity1 = validateProcess(processQFName,
queryString);
+
+ List<AtlasObjectId> processExecutions =
toAtlasObjectIdList(processEntity1.getRelationshipAttribute(
+ BaseImpalaEvent.ATTRIBUTE_PROCESS_EXECUTIONS));
+ Assert.assertEquals(processExecutions.size(), 2);
+ for (AtlasObjectId processExecutionId : processExecutions) {
+ AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo =
atlasClientV2.
+ getEntityByGuid(processExecutionId.getGuid());
+
+ AtlasEntity processExecutionEntity =
atlasEntityWithExtInfo.getEntity();
+ String entityQueryText =
String.valueOf(processExecutionEntity.getAttribute(ATTRIBUTE_QUERY_TEXT)).toLowerCase().trim();
+ if (!(queryString.equalsIgnoreCase(entityQueryText) ||
queryString2.equalsIgnoreCase(entityQueryText))) {
+ String errorMessage = String.format("process query text '%s'
does not match expected value of '%s' or '%s'", entityQueryText, queryString,
queryString2);
+ Assert.assertTrue(false, errorMessage);
+ }
+ }
}
}
\ No newline at end of file
diff --git
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
index 86801e3..6156208 100644
---
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
+++
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
@@ -28,6 +28,9 @@ import org.apache.atlas.impala.model.ImpalaVertexType;
import org.apache.atlas.impala.model.LineageEdge;
import org.apache.atlas.impala.model.ImpalaQuery;
import org.apache.atlas.impala.model.LineageVertex;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
@@ -135,7 +138,12 @@ public class ImpalaLineageHookIT extends
ImpalaLineageITBase {
processQFName = processQFName.toLowerCase();
- assertProcessIsRegistered(processQFName, queryObj.getQueryText());
+ AtlasEntity processEntity1 = validateProcess(processQFName,
queryObj.getQueryText());
+ AtlasEntity processExecutionEntity1 =
validateProcessExecution(processEntity1, queryObj.getQueryText());
+ AtlasObjectId process1 =
toAtlasObjectId(processExecutionEntity1.getRelationshipAttribute(
+ BaseImpalaEvent.ATTRIBUTE_PROCESS));
+ Assert.assertEquals(process1.getGuid(), processEntity1.getGuid());
+ Assert.assertEquals(numberOfProcessExecutions(processEntity1), 1);
} catch (Exception ex) {
LOG.error("process create_view failed: ", ex);
assertFalse(true);
diff --git
a/addons/impala-bridge/src/test/resources/impalaMultipleInsertIntoAsSelect1.json
b/addons/impala-bridge/src/test/resources/impalaMultipleInsertIntoAsSelect1.json
new file mode 100644
index 0000000..4e27837
--- /dev/null
+++
b/addons/impala-bridge/src/test/resources/impalaMultipleInsertIntoAsSelect1.json
@@ -0,0 +1,83 @@
+{
+ "queryText":"insert into table db_6.table_2 (count, id) select count, id
from db_6.table_1",
+ "queryId":"3a441d0c130962f8:7f634aec00000000",
+ "hash":"64ff0425ccdfaada53e3f2fd76f566f7",
+ "user":"admin",
+ "timestamp":1554750072,
+ "endTime":1554750554,
+ "edges":[
+ {
+ "sources":[
+ 1
+ ],
+ "targets":[
+ 0
+ ],
+ "edgeType":"PROJECTION"
+ },
+ {
+ "sources":[
+ 3
+ ],
+ "targets":[
+ 2
+ ],
+ "edgeType":"PROJECTION"
+ },
+ {
+ "sources":[
+ ],
+ "targets":[
+ 4
+ ],
+ "edgeType":"PROJECTION"
+ }
+ ],
+ "vertices":[
+ {
+ "id":0,
+ "vertexType":"COLUMN",
+ "vertexId":"db_6.table_2.count",
+ "metadata": {
+ "tableName": "db_6.table_2",
+ "tableCreateTime": 1554750072
+ }
+ },
+ {
+ "id":1,
+ "vertexType":"COLUMN",
+ "vertexId":"db_6.table_1.count",
+ "metadata": {
+ "tableName": "db_6.table_1",
+ "tableCreateTime": 1554750070
+ }
+ },
+ {
+ "id":2,
+ "vertexType":"COLUMN",
+ "vertexId":"db_6.table_2.id",
+ "metadata": {
+ "tableName": "db_6.table_2",
+ "tableCreateTime": 1554750072
+ }
+ },
+ {
+ "id":3,
+ "vertexType":"COLUMN",
+ "vertexId":"db_6.table_1.id",
+ "metadata": {
+ "tableName": "db_6.table_1",
+ "tableCreateTime": 1554750070
+ }
+ },
+ {
+ "id":4,
+ "vertexType":"COLUMN",
+ "vertexId":"db_6.table_2.int_col",
+ "metadata": {
+ "tableName": "db_6.table_2",
+ "tableCreateTime": 1554750072
+ }
+ }
+ ]
+}
diff --git
a/addons/impala-bridge/src/test/resources/impalaMultipleInsertIntoAsSelect2.json
b/addons/impala-bridge/src/test/resources/impalaMultipleInsertIntoAsSelect2.json
new file mode 100644
index 0000000..ece6535
--- /dev/null
+++
b/addons/impala-bridge/src/test/resources/impalaMultipleInsertIntoAsSelect2.json
@@ -0,0 +1,83 @@
+{
+ "queryText":"insert into table db_6.table_2 (count, id) select count, id
from db_6.table_1",
+ "queryId":"3a441d0c130962f8:7f634aec00000000",
+ "hash":"64ff0425ccdfaada53e3f2fd76f566f7",
+ "user":"admin",
+ "timestamp":1554750082,
+ "endTime":1554750584,
+ "edges":[
+ {
+ "sources":[
+ 1
+ ],
+ "targets":[
+ 0
+ ],
+ "edgeType":"PROJECTION"
+ },
+ {
+ "sources":[
+ 3
+ ],
+ "targets":[
+ 2
+ ],
+ "edgeType":"PROJECTION"
+ },
+ {
+ "sources":[
+ ],
+ "targets":[
+ 4
+ ],
+ "edgeType":"PROJECTION"
+ }
+ ],
+ "vertices":[
+ {
+ "id":0,
+ "vertexType":"COLUMN",
+ "vertexId":"db_6.table_2.count",
+ "metadata": {
+ "tableName": "db_6.table_2",
+ "tableCreateTime": 1554750072
+ }
+ },
+ {
+ "id":1,
+ "vertexType":"COLUMN",
+ "vertexId":"db_6.table_1.count",
+ "metadata": {
+ "tableName": "db_6.table_1",
+ "tableCreateTime": 1554750070
+ }
+ },
+ {
+ "id":2,
+ "vertexType":"COLUMN",
+ "vertexId":"db_6.table_2.id",
+ "metadata": {
+ "tableName": "db_6.table_2",
+ "tableCreateTime": 1554750072
+ }
+ },
+ {
+ "id":3,
+ "vertexType":"COLUMN",
+ "vertexId":"db_6.table_1.id",
+ "metadata": {
+ "tableName": "db_6.table_1",
+ "tableCreateTime": 1554750070
+ }
+ },
+ {
+ "id":4,
+ "vertexType":"COLUMN",
+ "vertexId":"db_6.table_2.int_col",
+ "metadata": {
+ "tableName": "db_6.table_2",
+ "tableCreateTime": 1554750072
+ }
+ }
+ ]
+}
\ No newline at end of file