Repository: atlas Updated Branches: refs/heads/master f15995cc8 -> f62ed0926
ATLAS-2649: updated Hive Hook to create lineage between HBase table and Hive table Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/f62ed092 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/f62ed092 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/f62ed092 Branch: refs/heads/master Commit: f62ed0926dc5a116729d4cb4d2f6d97fac3e8232 Parents: f15995c Author: rmani <[email protected]> Authored: Thu May 10 15:10:20 2018 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Thu May 10 22:54:03 2018 -0700 ---------------------------------------------------------------------- addons/hive-bridge/pom.xml | 5 + .../atlas/hive/hook/events/BaseHiveEvent.java | 97 +++++++++++++++++++- .../atlas/hive/hook/events/CreateTable.java | 27 +++++- 3 files changed, 122 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/f62ed092/addons/hive-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml index 0d4ab2c..5814854 100755 --- a/addons/hive-bridge/pom.xml +++ b/addons/hive-bridge/pom.xml @@ -295,6 +295,11 @@ <version>${hbase.version}</version> </artifactItem> <artifactItem> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + </artifactItem> + <artifactItem> <groupId>com.sun.jersey</groupId> <artifactId>jersey-json</artifactId> <version>${jersey.version}</version> http://git-wip-us.apache.org/repos/asf/atlas/blob/f62ed092/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java index ca13812..fad53c0 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java @@ -28,6 +28,7 @@ import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.utils.HdfsNameServiceResolver; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.Database; @@ -75,6 +76,8 @@ public abstract class BaseHiveEvent { public static final String HIVE_TYPE_SERDE = "hive_serde"; public static final String HIVE_TYPE_ORDER = "hive_order"; public static final String HDFS_TYPE_PATH = "hdfs_path"; + public static final String HBASE_TYPE_TABLE = "hbase_table"; + public static final String HBASE_TYPE_NAMESPACE = "hbase_namespace"; public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; public static final String ATTRIBUTE_NAME = "name"; @@ -124,9 +127,15 @@ public abstract class BaseHiveEvent { public static final String ATTRIBUTE_DEPENDENCY_TYPE = "depenendencyType"; public static final String ATTRIBUTE_EXPRESSION = "expression"; public static final String ATTRIBUTE_ALIASES = "aliases"; + public static final String ATTRIBUTE_URI = "uri"; + public static final String ATTRIBUTE_STORAGE_HANDLER = "storage_handler"; + public static final String ATTRIBUTE_NAMESPACE = "namespace"; - - public static final long MILLIS_CONVERT_FACTOR = 1000; + public static final String HBASE_STORAGE_HANDLER_CLASS = "org.apache.hadoop.hive.hbase.HBaseStorageHandler"; + public static final String HBASE_DEFAULT_NAMESPACE = "default"; + public static final String HBASE_NAMESPACE_TABLE_DELIMITER = ":"; + public static final String HBASE_PARAM_TABLE_NAME = "hbase.table.name"; + public static final long MILLIS_CONVERT_FACTOR = 1000; public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>(); @@ -156,6 +165,10 @@ public abstract class BaseHiveEvent { return table.getTTable() != null ? (table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR) : System.currentTimeMillis(); } + public static String getTableOwner(Table table) { + return table.getTTable() != null ? (table.getOwner()): ""; + } + public static AtlasObjectId getObjectId(AtlasEntity entity) { String qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); AtlasObjectId ret = new AtlasObjectId(entity.getGuid(), entity.getTypeName(), Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName)); @@ -694,6 +707,53 @@ public abstract class BaseHiveEvent { return sb.toString(); } + protected AtlasEntity toReferencedHBaseTable(Table table, AtlasEntitiesWithExtInfo entities) { + AtlasEntity ret = null; + HBaseTableInfo hBaseTableInfo = new HBaseTableInfo(table); + String hbaseNameSpace = hBaseTableInfo.getHbaseNameSpace(); + String hbaseTableName = hBaseTableInfo.getHbaseTableName(); + + if (hbaseTableName != null) { + AtlasEntity nsEntity = new AtlasEntity(HBASE_TYPE_NAMESPACE); + nsEntity.setAttribute(ATTRIBUTE_NAME, hbaseNameSpace); + nsEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName()); + nsEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseNameSpaceQualifiedName(getClusterName(), hbaseNameSpace)); + + ret = new AtlasEntity(HBASE_TYPE_TABLE); + + ret.setAttribute(ATTRIBUTE_NAME, hbaseTableName); + ret.setAttribute(ATTRIBUTE_URI, hbaseTableName); + ret.setAttribute(ATTRIBUTE_NAMESPACE, getObjectId(nsEntity)); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseTableQualifiedName(getClusterName(), hbaseNameSpace, hbaseTableName)); + + entities.addReferredEntity(nsEntity); + entities.addEntity(ret); + } + + return ret; + } + + protected boolean isHBaseStore(Table table) { + boolean ret = false; + Map<String, String> parameters = table.getParameters(); + + if (MapUtils.isNotEmpty(parameters)) { + String storageHandler = parameters.get(ATTRIBUTE_STORAGE_HANDLER); + + ret = (storageHandler != null && storageHandler.equals(HBASE_STORAGE_HANDLER_CLASS)); + } + + return ret; + } + + private static String getHBaseTableQualifiedName(String clusterName, String nameSpace, String tableName) { + return String.format("%s:%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), clusterName); + } + + private static String getHBaseNameSpaceQualifiedName(String clusterName, String nameSpace) { + return String.format("%s@%s", nameSpace.toLowerCase(), clusterName); + } + private boolean ignoreHDFSPathsinProcessQualifiedName() { switch (context.getHiveOperation()) { case LOAD: @@ -831,4 +891,37 @@ public abstract class BaseHiveEvent { } static final Comparator<Entity> entityComparator = new EntityComparator(); + + static final class HBaseTableInfo { + String hbaseNameSpace = null; + String hbaseTableName = null; + + HBaseTableInfo(Table table) { + Map<String, String> parameters = table.getParameters(); + + if (MapUtils.isNotEmpty(parameters)) { + hbaseNameSpace = HBASE_DEFAULT_NAMESPACE; + hbaseTableName = parameters.get(HBASE_PARAM_TABLE_NAME); + + if (hbaseTableName != null) { + if (hbaseTableName.contains(HBASE_NAMESPACE_TABLE_DELIMITER)) { + String[] hbaseTableInfo = hbaseTableName.split(HBASE_NAMESPACE_TABLE_DELIMITER); + + if (hbaseTableInfo.length > 1) { + hbaseNameSpace = hbaseTableInfo[0]; + hbaseTableName = hbaseTableInfo[1]; + } + } + } + } + } + + public String getHbaseNameSpace() { + return hbaseNameSpace; + } + + public String getHbaseTableName() { + return hbaseTableName; + } + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/f62ed092/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java index 1c072e9..daf5c86 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java @@ -81,12 +81,29 @@ public class CreateTable extends BaseHiveEvent { if (table != null) { AtlasEntity tblEntity = toTableEntity(table, ret); - if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) { - AtlasEntity hdfsPathEntity = getHDFSPathEntity(table.getDataLocation()); - AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity)); + if (isHBaseStore(table)) { + // This create lineage to HBase table in case of Hive on HBase + AtlasEntity hbaseTableEntity = toReferencedHBaseTable(table, ret); - ret.addEntity(processEntity); - ret.addReferredEntity(hdfsPathEntity); + if (hbaseTableEntity != null) { + final AtlasEntity processEntity; + + if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) { + processEntity = getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), Collections.singletonList(tblEntity)); + } else { + processEntity = getHiveProcessEntity(Collections.singletonList(tblEntity), Collections.singletonList(hbaseTableEntity)); + } + + ret.addEntity(processEntity); + } + } else { + if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) { + AtlasEntity hdfsPathEntity = getHDFSPathEntity(table.getDataLocation()); + AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity)); + + ret.addEntity(processEntity); + ret.addReferredEntity(hdfsPathEntity); + } } }
