This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit b587dee9d12118173a231fa04e76690d0694ecb3 Author: Sarath Subramanian <[email protected]> AuthorDate: Thu Jul 11 16:41:58 2019 -0700 ATLAS-3321: Introduce atlas metadata namespace (cherry picked from commit ef4025160dd456f3ac67ef39551ba280925a6a94) --- addons/hbase-bridge/src/bin/import-hbase.sh | 2 +- .../apache/atlas/hbase/bridge/HBaseAtlasHook.java | 75 +++++++--------- .../org/apache/atlas/hbase/bridge/HBaseBridge.java | 85 ++++++++++-------- addons/hive-bridge/src/bin/import-hive.sh | 2 +- .../atlas/hive/bridge/HiveMetaStoreBridge.java | 100 ++++++++++++--------- .../atlas/hive/hook/AtlasHiveHookContext.java | 20 ++--- .../java/org/apache/atlas/hive/hook/HiveHook.java | 9 -- .../atlas/hive/hook/events/BaseHiveEvent.java | 61 +++++++------ .../java/org/apache/atlas/hive/HiveITBase.java | 2 +- .../atlas/hive/bridge/HiveMetaStoreBridgeTest.java | 47 +++++----- .../atlas/impala/hook/AtlasImpalaHookContext.java | 17 ++-- .../atlas/impala/hook/ImpalaLineageHook.java | 16 +--- .../atlas/impala/hook/events/BaseImpalaEvent.java | 2 +- .../apache/atlas/impala/ImpalaLineageITBase.java | 4 +- .../apache/atlas/impala/ImpalaLineageToolIT.java | 18 ++-- .../atlas/impala/hook/ImpalaLineageHookIT.java | 2 +- addons/kafka-bridge/src/bin/import-kafka.sh | 2 +- .../org/apache/atlas/kafka/bridge/KafkaBridge.java | 55 +++++++----- .../org/apache/atlas/sqoop/hook/SqoopHook.java | 40 +++++---- .../apache/atlas/storm/hook/StormAtlasHook.java | 57 +++++------- .../main/java/org/apache/atlas/AtlasConstants.java | 20 ++--- .../main/java/org/apache/atlas/hook/AtlasHook.java | 18 +++- 22 files changed, 336 insertions(+), 318 deletions(-) diff --git a/addons/hbase-bridge/src/bin/import-hbase.sh b/addons/hbase-bridge/src/bin/import-hbase.sh index b9c1a4b..34c4180 100644 --- a/addons/hbase-bridge/src/bin/import-hbase.sh +++ b/addons/hbase-bridge/src/bin/import-hbase.sh @@ -114,7 +114,7 @@ else exit 1 fi -CP="${ATLASCPPATH}:${HBASE_CP}:${HADOOP_CP}" +CP="${HBASE_CP}:${HADOOP_CP}:${ATLASCPPATH}" # If running in cygwin, convert pathnames and classpath to Windows format. if [ "${CYGWIN}" == "true" ] diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java index 1825cd2..6d062e2 100644 --- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java +++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java @@ -30,7 +30,6 @@ import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2; import org.apache.atlas.type.AtlasTypeUtil; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.configuration.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; @@ -55,20 +54,17 @@ public class HBaseAtlasHook extends AtlasHook { private static final Logger LOG = LoggerFactory.getLogger(HBaseAtlasHook.class); - public static final String HBASE_CLUSTER_NAME = "atlas.cluster.name"; - public static final String DEFAULT_CLUSTER_NAME = "primary"; - public static final String ATTR_DESCRIPTION = "description"; - public static final String ATTR_ATLAS_ENDPOINT = "atlas.rest.address"; - public static final String ATTR_COMMENT = "comment"; - public static final String ATTR_PARAMETERS = "parameters"; - public static final String ATTR_URI = "uri"; - public static final String ATTR_NAMESPACE = "namespace"; - public static final String ATTR_TABLE = "table"; - public static final String ATTR_COLUMNFAMILIES = "column_families"; - public static final String ATTR_CREATE_TIME = "createTime"; - public static final String ATTR_MODIFIED_TIME = "modifiedTime"; - public static final String ATTR_OWNER = "owner"; - public static final String ATTR_NAME = "name"; + public static final String ATTR_DESCRIPTION = "description"; + public static final String ATTR_ATLAS_ENDPOINT = "atlas.rest.address"; + public static final String ATTR_PARAMETERS = "parameters"; + public static final String ATTR_URI = "uri"; + public static final String ATTR_NAMESPACE = "namespace"; + public static final String ATTR_TABLE = "table"; + public static final String ATTR_COLUMNFAMILIES = "column_families"; + public static final String ATTR_CREATE_TIME = "createTime"; + public static final String ATTR_MODIFIED_TIME = "modifiedTime"; + public static final String ATTR_OWNER = "owner"; + public static final String ATTR_NAME = "name"; // column addition metadata public static final String ATTR_TABLE_MAX_FILESIZE = "maxFileSize"; @@ -106,7 +102,6 @@ public class HBaseAtlasHook extends AtlasHook { public static final String HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT = "%s:%s.%s@%s"; private static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName"; - private String clusterName = null; private static volatile HBaseAtlasHook me; @@ -141,7 +136,7 @@ public class HBaseAtlasHook extends AtlasHook { ret = me; if (ret == null) { - me = ret = new HBaseAtlasHook(atlasProperties); + me = ret = new HBaseAtlasHook(); } } } catch (Exception e) { @@ -152,15 +147,9 @@ public class HBaseAtlasHook extends AtlasHook { return ret; } - public HBaseAtlasHook(Configuration atlasProperties) { - this(atlasProperties.getString(HBASE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME)); + public HBaseAtlasHook() { } - public HBaseAtlasHook(String clusterName) { - this.clusterName = clusterName; - } - - public void createAtlasInstances(HBaseOperationContext hbaseOperationContext) { OPERATION operation = hbaseOperationContext.getOperation(); @@ -210,7 +199,7 @@ public class HBaseAtlasHook extends AtlasHook { } private void deleteNameSpaceInstance(HBaseOperationContext hbaseOperationContext) { - String nameSpaceQName = getNameSpaceQualifiedName(clusterName, hbaseOperationContext.getNameSpace()); + String nameSpaceQName = getNameSpaceQualifiedName(getMetadataNamespace(), hbaseOperationContext.getNameSpace()); AtlasObjectId nameSpaceId = new AtlasObjectId(HBaseDataTypes.HBASE_NAMESPACE.getName(), REFERENCEABLE_ATTRIBUTE_NAME, nameSpaceQName); LOG.info("Delete NameSpace {}", nameSpaceQName); @@ -259,7 +248,7 @@ public class HBaseAtlasHook extends AtlasHook { } String tableNameStr = tableName.getNameAsString(); - String tableQName = getTableQualifiedName(clusterName, nameSpaceName, tableNameStr); + String tableQName = getTableQualifiedName(getMetadataNamespace(), nameSpaceName, tableNameStr); AtlasObjectId tableId = new AtlasObjectId(HBaseDataTypes.HBASE_TABLE.getName(), REFERENCEABLE_ATTRIBUTE_NAME, tableQName); LOG.info("Delete Table {}", tableQName); @@ -302,7 +291,7 @@ public class HBaseAtlasHook extends AtlasHook { String tableNameStr = tableName.getNameAsString(); String columnFamilyName = hbaseOperationContext.getColummFamily(); - String columnFamilyQName = getColumnFamilyQualifiedName(clusterName, nameSpaceName, tableNameStr, columnFamilyName); + String columnFamilyQName = getColumnFamilyQualifiedName(getMetadataNamespace(), nameSpaceName, tableNameStr, columnFamilyName); AtlasObjectId columnFamilyId = new AtlasObjectId(HBaseDataTypes.HBASE_COLUMN_FAMILY.getName(), REFERENCEABLE_ATTRIBUTE_NAME, columnFamilyQName); LOG.info("Delete ColumnFamily {}", columnFamilyQName); @@ -314,48 +303,48 @@ public class HBaseAtlasHook extends AtlasHook { /** * Construct the qualified name used to uniquely identify a ColumnFamily instance in Atlas. * - * @param clusterName Name of the cluster to which the HBase component belongs + * @param metadataNamespace Metadata namespace of the cluster to which the HBase component belongs * @param nameSpace Name of the HBase database to which the Table belongs * @param tableName Name of the HBase table * @param columnFamily Name of the ColumnFamily * @return Unique qualified name to identify the Table instance in Atlas. */ - public static String getColumnFamilyQualifiedName(String clusterName, String nameSpace, String tableName, String columnFamily) { - if (clusterName == null || nameSpace == null || tableName == null || columnFamily == null) { + public static String getColumnFamilyQualifiedName(String metadataNamespace, String nameSpace, String tableName, String columnFamily) { + if (metadataNamespace == null || nameSpace == null || tableName == null || columnFamily == null) { return null; } else { - return String.format(HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), stripNameSpace(tableName.toLowerCase()), columnFamily.toLowerCase(), clusterName); + return String.format(HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), stripNameSpace(tableName.toLowerCase()), columnFamily.toLowerCase(), metadataNamespace); } } /** * Construct the qualified name used to uniquely identify a Table instance in Atlas. * - * @param clusterName Name of the cluster to which the HBase component belongs + * @param metadataNamespace Metadata namespace of the cluster to which the HBase component belongs * @param nameSpace Name of the HBase database to which the Table belongs * @param tableName Name of the HBase table * @return Unique qualified name to identify the Table instance in Atlas. */ - public static String getTableQualifiedName(String clusterName, String nameSpace, String tableName) { - if (clusterName == null || nameSpace == null || tableName == null) { + public static String getTableQualifiedName(String metadataNamespace, String nameSpace, String tableName) { + if (metadataNamespace == null || nameSpace == null || tableName == null) { return null; } else { - return String.format(HBASE_TABLE_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), stripNameSpace(tableName.toLowerCase()), clusterName); + return String.format(HBASE_TABLE_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), stripNameSpace(tableName.toLowerCase()), metadataNamespace); } } /** * Construct the qualified name used to uniquely identify a HBase NameSpace instance in Atlas. * - * @param clusterName Name of the cluster to which the HBase component belongs + * @param metadataNamespace Metadata namespace of the cluster to which the HBase component belongs * @param nameSpace * @return Unique qualified name to identify the HBase NameSpace instance in Atlas. */ - public static String getNameSpaceQualifiedName(String clusterName, String nameSpace) { - if (clusterName == null || nameSpace == null) { + public static String getNameSpaceQualifiedName(String metadataNamespace, String nameSpace) { + if (metadataNamespace == null || nameSpace == null) { return null; } else { - return String.format(HBASE_NAMESPACE_QUALIFIED_NAME, nameSpace.toLowerCase(), clusterName); + return String.format(HBASE_NAMESPACE_QUALIFIED_NAME, nameSpace.toLowerCase(), metadataNamespace); } } @@ -375,8 +364,8 @@ public class HBaseAtlasHook extends AtlasHook { Date now = new Date(System.currentTimeMillis()); nameSpace.setAttribute(ATTR_NAME, nameSpaceName); - nameSpace.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, getNameSpaceQualifiedName(clusterName, nameSpaceName)); - nameSpace.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); + nameSpace.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME, getNameSpaceQualifiedName(getMetadataNamespace(), nameSpaceName)); + nameSpace.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getMetadataNamespace()); nameSpace.setAttribute(ATTR_DESCRIPTION, nameSpaceName); nameSpace.setAttribute(ATTR_PARAMETERS, hbaseOperationContext.getHbaseConf()); nameSpace.setAttribute(ATTR_OWNER, hbaseOperationContext.getOwner()); @@ -393,7 +382,7 @@ public class HBaseAtlasHook extends AtlasHook { AtlasEntity table = new AtlasEntity(HBaseDataTypes.HBASE_TABLE.getName()); String tableName = getTableName(hbaseOperationContext); String nameSpaceName = (String) nameSpace.getAttribute(ATTR_NAME); - String tableQName = getTableQualifiedName(clusterName, nameSpaceName, tableName); + String tableQName = getTableQualifiedName(getMetadataNamespace(), nameSpaceName, tableName); OPERATION operation = hbaseOperationContext.getOperation(); Date now = new Date(System.currentTimeMillis()); @@ -455,7 +444,7 @@ public class HBaseAtlasHook extends AtlasHook { String columnFamilyName = columnFamilyDescriptor.getNameAsString(); String tableName = (String) table.getAttribute(ATTR_NAME); String nameSpaceName = (String) nameSpace.getAttribute(ATTR_NAME); - String columnFamilyQName = getColumnFamilyQualifiedName(clusterName, nameSpaceName, tableName, columnFamilyName); + String columnFamilyQName = getColumnFamilyQualifiedName(getMetadataNamespace(), nameSpaceName, tableName, columnFamilyName); Date now = new Date(System.currentTimeMillis()); columnFamily.setAttribute(ATTR_NAME, columnFamilyName); diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java index 1765c18..4a4b4d9 100644 --- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java +++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java @@ -65,21 +65,22 @@ import java.util.regex.Pattern; public class HBaseBridge { private static final Logger LOG = LoggerFactory.getLogger(HBaseBridge.class); - private static final int EXIT_CODE_SUCCESS = 0; - private static final int EXIT_CODE_FAILED = 1; - private static final String ATLAS_ENDPOINT = "atlas.rest.address"; - private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; - private static final String HBASE_CLUSTER_NAME = "atlas.cluster.name"; - private static final String DEFAULT_CLUSTER_NAME = "primary"; - private static final String QUALIFIED_NAME = "qualifiedName"; - private static final String NAME = "name"; - private static final String URI = "uri"; - private static final String OWNER = "owner"; - private static final String DESCRIPTION_ATTR = "description"; - private static final String CLUSTERNAME = "clusterName"; - private static final String NAMESPACE = "namespace"; - private static final String TABLE = "table"; - private static final String COLUMN_FAMILIES = "column_families"; + private static final int EXIT_CODE_SUCCESS = 0; + private static final int EXIT_CODE_FAILED = 1; + private static final String ATLAS_ENDPOINT = "atlas.rest.address"; + private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; + private static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; + private static final String DEFAULT_CLUSTER_NAME = "primary"; + private static final String HBASE_METADATA_NAMESPACE = "atlas.metadata.namespace"; + private static final String QUALIFIED_NAME = "qualifiedName"; + private static final String NAME = "name"; + private static final String URI = "uri"; + private static final String OWNER = "owner"; + private static final String DESCRIPTION_ATTR = "description"; + private static final String CLUSTERNAME = "clusterName"; + private static final String NAMESPACE = "namespace"; + private static final String TABLE = "table"; + private static final String COLUMN_FAMILIES = "column_families"; // table metadata private static final String ATTR_TABLE_MAX_FILESIZE = "maxFileSize"; @@ -115,9 +116,9 @@ public class HBaseBridge { private static final String HBASE_TABLE_QUALIFIED_NAME_FORMAT = "%s:%s@%s"; private static final String HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT = "%s:%s.%s@%s"; - private final String clusterName; - private final AtlasClientV2 atlasClientV2; - private final Admin hbaseAdmin; + private final String metadataNamespace; + private final AtlasClientV2 atlasClientV2; + private final Admin hbaseAdmin; public static void main(String[] args) { @@ -204,8 +205,8 @@ public class HBaseBridge { } public HBaseBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2) throws Exception { - this.atlasClientV2 = atlasClientV2; - this.clusterName = atlasConf.getString(HBASE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); + this.atlasClientV2 = atlasClientV2; + this.metadataNamespace = getMetadataNamespace(atlasConf); org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); @@ -220,6 +221,14 @@ public class HBaseBridge { hbaseAdmin = conn.getAdmin(); } + private String getMetadataNamespace(Configuration config) { + return config.getString(HBASE_METADATA_NAMESPACE, getClusterName(config)); + } + + private String getClusterName(Configuration config) { + return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME); + } + private boolean importHBaseEntities(String namespaceToImport, String tableToImport) throws Exception { boolean ret = false; @@ -367,7 +376,7 @@ public class HBaseBridge { protected AtlasEntityWithExtInfo createOrUpdateNameSpace(NamespaceDescriptor namespaceDescriptor) throws Exception { String nsName = namespaceDescriptor.getName(); - String nsQualifiedName = getNameSpaceQualifiedName(clusterName, nsName); + String nsQualifiedName = getNameSpaceQualifiedName(metadataNamespace, nsName); AtlasEntityWithExtInfo nsEntity = findNameSpaceEntityInAtlas(nsQualifiedName); if (nsEntity == null) { @@ -390,7 +399,7 @@ public class HBaseBridge { protected AtlasEntityWithExtInfo createOrUpdateTable(String nameSpace, String tableName, AtlasEntity nameSapceEntity, TableDescriptor htd, ColumnFamilyDescriptor[] hcdts) throws Exception { String owner = htd.getOwnerString(); - String tblQualifiedName = getTableQualifiedName(clusterName, nameSpace, tableName); + String tblQualifiedName = getTableQualifiedName(metadataNamespace, nameSpace, tableName); AtlasEntityWithExtInfo ret = findTableEntityInAtlas(tblQualifiedName); if (ret == null) { @@ -436,7 +445,7 @@ public class HBaseBridge { for (ColumnFamilyDescriptor columnFamilyDescriptor : hcdts) { String cfName = columnFamilyDescriptor.getNameAsString(); - String cfQualifiedName = getColumnFamilyQualifiedName(clusterName, nameSpace, tableName, cfName); + String cfQualifiedName = getColumnFamilyQualifiedName(metadataNamespace, nameSpace, tableName, cfName); AtlasEntityWithExtInfo cfEntity = findColumnFamiltyEntityInAtlas(cfQualifiedName); if (cfEntity == null) { @@ -516,10 +525,10 @@ public class HBaseBridge { ret = nsEtity; } - String qualifiedName = getNameSpaceQualifiedName(clusterName, nameSpace); + String qualifiedName = getNameSpaceQualifiedName(metadataNamespace, nameSpace); ret.setAttribute(QUALIFIED_NAME, qualifiedName); - ret.setAttribute(CLUSTERNAME, clusterName); + ret.setAttribute(CLUSTERNAME, metadataNamespace); ret.setAttribute(NAME, nameSpace); ret.setAttribute(DESCRIPTION_ATTR, nameSpace); @@ -535,10 +544,10 @@ public class HBaseBridge { ret = atlasEntity; } - String tableQualifiedName = getTableQualifiedName(clusterName, nameSpace, tableName); + String tableQualifiedName = getTableQualifiedName(metadataNamespace, nameSpace, tableName); ret.setAttribute(QUALIFIED_NAME, tableQualifiedName); - ret.setAttribute(CLUSTERNAME, clusterName); + ret.setAttribute(CLUSTERNAME, metadataNamespace); ret.setAttribute(NAMESPACE, AtlasTypeUtil.getAtlasObjectId(nameSpaceEntity)); ret.setAttribute(NAME, tableName); ret.setAttribute(DESCRIPTION_ATTR, tableName); @@ -564,10 +573,10 @@ public class HBaseBridge { } String cfName = hcdt.getNameAsString(); - String cfQualifiedName = getColumnFamilyQualifiedName(clusterName, nameSpace, tableName, cfName); + String cfQualifiedName = getColumnFamilyQualifiedName(metadataNamespace, nameSpace, tableName, cfName); ret.setAttribute(QUALIFIED_NAME, cfQualifiedName); - ret.setAttribute(CLUSTERNAME, clusterName); + ret.setAttribute(CLUSTERNAME, metadataNamespace); ret.setAttribute(TABLE, tableId); ret.setAttribute(NAME, cfName); ret.setAttribute(DESCRIPTION_ATTR, cfName); @@ -637,37 +646,37 @@ public class HBaseBridge { /** * Construct the qualified name used to uniquely identify a ColumnFamily instance in Atlas. - * @param clusterName Name of the cluster to which the Hbase component belongs + * @param metadataNamespace Metadata namespace of the cluster to which the Hbase component belongs * @param nameSpace Name of the Hbase database to which the Table belongs * @param tableName Name of the Hbase table * @param columnFamily Name of the ColumnFamily * @return Unique qualified name to identify the Table instance in Atlas. */ - private static String getColumnFamilyQualifiedName(String clusterName, String nameSpace, String tableName, String columnFamily) { + private static String getColumnFamilyQualifiedName(String metadataNamespace, String nameSpace, String tableName, String columnFamily) { tableName = stripNameSpace(tableName.toLowerCase()); - return String.format(HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), tableName, columnFamily.toLowerCase(), clusterName); + return String.format(HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), tableName, columnFamily.toLowerCase(), metadataNamespace); } /** * Construct the qualified name used to uniquely identify a Table instance in Atlas. - * @param clusterName Name of the cluster to which the Hbase component belongs + * @param metadataNamespace Metadata namespace of the cluster to which the Hbase component belongs * @param nameSpace Name of the Hbase database to which the Table belongs * @param tableName Name of the Hbase table * @return Unique qualified name to identify the Table instance in Atlas. */ - private static String getTableQualifiedName(String clusterName, String nameSpace, String tableName) { + private static String getTableQualifiedName(String metadataNamespace, String nameSpace, String tableName) { tableName = stripNameSpace(tableName.toLowerCase()); - return String.format(HBASE_TABLE_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), tableName, clusterName); + return String.format(HBASE_TABLE_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), tableName, metadataNamespace); } /** * Construct the qualified name used to uniquely identify a Hbase NameSpace instance in Atlas. - * @param clusterName Name of the cluster to which the Hbase component belongs + * @param metadataNamespace Metadata namespace of the cluster to which the Hbase component belongs * @param nameSpace Name of the NameSpace * @return Unique qualified name to identify the HBase NameSpace instance in Atlas. */ - private static String getNameSpaceQualifiedName(String clusterName, String nameSpace) { - return String.format(HBASE_NAMESPACE_QUALIFIED_NAME, nameSpace.toLowerCase(), clusterName); + private static String getNameSpaceQualifiedName(String metadataNamespace, String nameSpace) { + return String.format(HBASE_NAMESPACE_QUALIFIED_NAME, nameSpace.toLowerCase(), metadataNamespace); } private static String stripNameSpace(String tableName){ diff --git a/addons/hive-bridge/src/bin/import-hive.sh b/addons/hive-bridge/src/bin/import-hive.sh index 38ec3be..b1660ea 100755 --- a/addons/hive-bridge/src/bin/import-hive.sh +++ b/addons/hive-bridge/src/bin/import-hive.sh @@ -109,7 +109,7 @@ else exit 1 fi -CP="${ATLASCPPATH}:${HIVE_CP}:${HADOOP_CP}" +CP="${HIVE_CP}:${HADOOP_CP}:${ATLASCPPATH}" # If running in cygwin, convert pathnames and classpath to Windows format. if [ "${CYGWIN}" == "true" ] diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index 5f8f846..049112b 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -83,7 +83,8 @@ public class HiveMetaStoreBridge { private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class); public static final String CONF_PREFIX = "atlas.hook.hive."; - public static final String HIVE_CLUSTER_NAME = "atlas.cluster.name"; + public static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; + public static final String HIVE_METADATA_NAMESPACE = "atlas.metadata.namespace"; public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase"; public static final String DEFAULT_CLUSTER_NAME = "primary"; public static final String TEMP_TABLE_PREFIX = "_temp-"; @@ -95,10 +96,10 @@ public class HiveMetaStoreBridge { private static final int EXIT_CODE_FAILED = 1; private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; - private final String clusterName; - private final Hive hiveClient; - private final AtlasClientV2 atlasClientV2; - private final boolean convertHdfsPathToLowerCase; + private final String metadataNamespace; + private final Hive hiveClient; + private final AtlasClientV2 atlasClientV2; + private final boolean convertHdfsPathToLowerCase; public static void main(String[] args) { @@ -209,7 +210,10 @@ public class HiveMetaStoreBridge { * @param hiveConf {@link HiveConf} for Hive component in the cluster */ public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf hiveConf, AtlasClientV2 atlasClientV2) throws Exception { - this(atlasProperties.getString(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), atlasClientV2, atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false)); + this.metadataNamespace = getMetadataNamespace(atlasProperties); + this.hiveClient = Hive.get(hiveConf); + this.atlasClientV2 = atlasClientV2; + this.convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false); } /** @@ -220,19 +224,27 @@ public class HiveMetaStoreBridge { this(atlasProperties, hiveConf, null); } - HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClientV2 atlasClientV2) { - this(clusterName, hiveClient, atlasClientV2, true); + HiveMetaStoreBridge(String metadataNamespace, Hive hiveClient, AtlasClientV2 atlasClientV2) { + this(metadataNamespace, hiveClient, atlasClientV2, true); } - HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClientV2 atlasClientV2, boolean convertHdfsPathToLowerCase) { - this.clusterName = clusterName; + HiveMetaStoreBridge(String metadataNamespace, Hive hiveClient, AtlasClientV2 atlasClientV2, boolean convertHdfsPathToLowerCase) { + this.metadataNamespace = metadataNamespace; this.hiveClient = hiveClient; this.atlasClientV2 = atlasClientV2; this.convertHdfsPathToLowerCase = convertHdfsPathToLowerCase; } - public String getClusterName() { - return clusterName; + public String getMetadataNamespace(Configuration config) { + return config.getString(HIVE_METADATA_NAMESPACE, getClusterName(config)); + } + + private String getClusterName(Configuration config) { + return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME); + } + + public String getMetadataNamespace() { + return metadataNamespace; } public Hive getHiveClient() { @@ -337,7 +349,7 @@ public class HiveMetaStoreBridge { AtlasEntityWithExtInfo tableEntity = registerTable(dbEntity, table); if (table.getTableType() == TableType.EXTERNAL_TABLE) { - String processQualifiedName = getTableProcessQualifiedName(clusterName, table); + String processQualifiedName = getTableProcessQualifiedName(metadataNamespace, table); AtlasEntityWithExtInfo processEntity = findProcessEntity(processQualifiedName); if (processEntity == null) { @@ -350,7 +362,7 @@ public class HiveMetaStoreBridge { processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME, processQualifiedName); processInst.setAttribute(ATTRIBUTE_NAME, query); - processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName); + processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace); processInst.setAttribute(ATTRIBUTE_INPUTS, Collections.singletonList(BaseHiveEvent.getObjectId(pathInst))); processInst.setAttribute(ATTRIBUTE_OUTPUTS, Collections.singletonList(BaseHiveEvent.getObjectId(tableInst))); processInst.setAttribute(ATTRIBUTE_USER_NAME, table.getOwner()); @@ -396,7 +408,7 @@ public class HiveMetaStoreBridge { Database db = hiveClient.getDatabase(databaseName); if (db != null) { - ret = findDatabase(clusterName, databaseName); + ret = findDatabase(metadataNamespace, databaseName); if (ret == null) { ret = registerInstance(new AtlasEntityWithExtInfo(toDbEntity(db))); @@ -542,12 +554,12 @@ public class HiveMetaStoreBridge { String dbName = hiveDB.getName().toLowerCase(); - dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getDBQualifiedName(clusterName, dbName)); + dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getDBQualifiedName(metadataNamespace, dbName)); dbEntity.setAttribute(ATTRIBUTE_NAME, dbName); dbEntity.setAttribute(ATTRIBUTE_DESCRIPTION, hiveDB.getDescription()); dbEntity.setAttribute(ATTRIBUTE_OWNER, hiveDB.getOwnerName()); - dbEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName); + dbEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace); dbEntity.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(hiveDB.getLocationUri())); dbEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveDB.getParameters()); @@ -574,7 +586,7 @@ public class HiveMetaStoreBridge { } AtlasEntity tableEntity = table.getEntity(); - String tableQualifiedName = getTableQualifiedName(clusterName, hiveTable); + String tableQualifiedName = getTableQualifiedName(metadataNamespace, hiveTable); long createTime = BaseHiveEvent.getTableCreateTime(hiveTable); long lastAccessTime = hiveTable.getLastAccessTime() > 0 ? hiveTable.getLastAccessTime() : createTime; @@ -705,7 +717,7 @@ public class HiveMetaStoreBridge { Path path = new Path(pathUri); ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString()); - ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName); + ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace); if (StringUtils.isNotEmpty(nameServiceID)) { // Name service resolution is successful, now get updated HDFS path where the host port info is replaced by resolved name service @@ -717,7 +729,7 @@ public class HiveMetaStoreBridge { } else { ret.setAttribute(ATTRIBUTE_PATH, pathUri); - // Only append clusterName for the HDFS path + // Only append metadataNamespace for the HDFS path if (pathUri.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) { ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHdfsPathQualifiedName(pathUri)); } else { @@ -731,18 +743,18 @@ public class HiveMetaStoreBridge { /** * Gets the atlas entity for the database * @param databaseName database Name - * @param clusterName cluster name + * @param metadataNamespace cluster name * @return AtlasEntity for database if exists, else null * @throws Exception */ - private AtlasEntityWithExtInfo findDatabase(String clusterName, String databaseName) throws Exception { + private AtlasEntityWithExtInfo findDatabase(String metadataNamespace, String databaseName) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("Searching Atlas for database {}", databaseName); } String typeName = HiveDataTypes.HIVE_DB.getName(); - return findEntity(typeName, getDBQualifiedName(clusterName, databaseName)); + return findEntity(typeName, getDBQualifiedName(metadataNamespace, databaseName)); } /** @@ -758,7 +770,7 @@ public class HiveMetaStoreBridge { } String typeName = HiveDataTypes.HIVE_TABLE.getName(); - String tblQualifiedName = getTableQualifiedName(getClusterName(), hiveTable.getDbName(), hiveTable.getTableName()); + String tblQualifiedName = getTableQualifiedName(getMetadataNamespace(), hiveTable.getDbName(), hiveTable.getTableName()); return findEntity(typeName, tblQualifiedName); } @@ -822,37 +834,37 @@ public class HiveMetaStoreBridge { /** * Construct the qualified name used to uniquely identify a Table instance in Atlas. - * @param clusterName Name of the cluster to which the Hive component belongs + * @param metadataNamespace Metadata namespace of the cluster to which the Hive component belongs * @param table hive table for which the qualified name is needed * @return Unique qualified name to identify the Table instance in Atlas. */ - private static String getTableQualifiedName(String clusterName, Table table) { - return getTableQualifiedName(clusterName, table.getDbName(), table.getTableName(), table.isTemporary()); + private static String getTableQualifiedName(String metadataNamespace, Table table) { + return getTableQualifiedName(metadataNamespace, table.getDbName(), table.getTableName(), table.isTemporary()); } private String getHdfsPathQualifiedName(String hdfsPath) { - return String.format("%s@%s", hdfsPath, clusterName); + return String.format("%s@%s", hdfsPath, metadataNamespace); } /** * Construct the qualified name used to uniquely identify a Database instance in Atlas. - * @param clusterName Name of the cluster to which the Hive component belongs + * @param metadataNamespace Name of the cluster to which the Hive component belongs * @param dbName Name of the Hive database * @return Unique qualified name to identify the Database instance in Atlas. */ - public static String getDBQualifiedName(String clusterName, String dbName) { - return String.format("%s@%s", dbName.toLowerCase(), clusterName); + public static String getDBQualifiedName(String metadataNamespace, String dbName) { + return String.format("%s@%s", dbName.toLowerCase(), metadataNamespace); } /** * Construct the qualified name used to uniquely identify a Table instance in Atlas. - * @param clusterName Name of the cluster to which the Hive component belongs + * @param metadataNamespace Name of the cluster to which the Hive component belongs * @param dbName Name of the Hive database to which the Table belongs * @param tableName Name of the Hive table * @param isTemporaryTable is this a temporary table * @return Unique qualified name to identify the Table instance in Atlas. */ - public static String getTableQualifiedName(String clusterName, String dbName, String tableName, boolean isTemporaryTable) { + public static String getTableQualifiedName(String metadataNamespace, String dbName, String tableName, boolean isTemporaryTable) { String tableTempName = tableName; if (isTemporaryTable) { @@ -863,11 +875,11 @@ public class HiveMetaStoreBridge { } } - return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), clusterName); + return String.format("%s.%s@%s", dbName.toLowerCase(), tableTempName.toLowerCase(), metadataNamespace); } - public static String getTableProcessQualifiedName(String clusterName, Table table) { - String tableQualifiedName = getTableQualifiedName(clusterName, table); + public static String getTableProcessQualifiedName(String metadataNamespace, Table table) { + String tableQualifiedName = getTableQualifiedName(metadataNamespace, table); long createdTime = getTableCreatedTime(table); return tableQualifiedName + SEP + createdTime; @@ -876,24 +888,24 @@ public class HiveMetaStoreBridge { /** * Construct the qualified name used to uniquely identify a Table instance in Atlas. - * @param clusterName Name of the cluster to which the Hive component belongs + * @param metadataNamespace Metadata namespace of the cluster to which the Hive component belongs * @param dbName Name of the Hive database to which the Table belongs * @param tableName Name of the Hive table * @return Unique qualified name to identify the Table instance in Atlas. */ - public static String getTableQualifiedName(String clusterName, String dbName, String tableName) { - return getTableQualifiedName(clusterName, dbName, tableName, false); + public static String getTableQualifiedName(String metadataNamespace, String dbName, String tableName) { + return getTableQualifiedName(metadataNamespace, dbName, tableName, false); } public static String getStorageDescQFName(String tableQualifiedName) { return tableQualifiedName + "_storage"; } public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) { - final String[] parts = tableQualifiedName.split("@"); - final String tableName = parts[0]; - final String clusterName = parts[1]; + final String[] parts = tableQualifiedName.split("@"); + final String tableName = parts[0]; + final String metadataNamespace = parts[1]; - return String.format("%s.%s@%s", tableName, colName.toLowerCase(), clusterName); + return String.format("%s.%s@%s", tableName, colName.toLowerCase(), metadataNamespace); } public static long getTableCreatedTime(Table table) { @@ -945,4 +957,4 @@ public class HiveMetaStoreBridge { } return ret; } -} +} \ No newline at end of file diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java index 76d6fe6..0eee7c1 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java @@ -38,12 +38,12 @@ import static org.apache.atlas.hive.hook.events.BaseHiveEvent.toTable; public class AtlasHiveHookContext { - public static final char QNAME_SEP_CLUSTER_NAME = '@'; - public static final char QNAME_SEP_ENTITY_NAME = '.'; - public static final char QNAME_SEP_PROCESS = ':'; - public static final String TEMP_TABLE_PREFIX = "_temp-"; - public static final String CREATE_OPERATION = "CREATE"; - public static final String ALTER_OPERATION = "ALTER"; + public static final char QNAME_SEP_METADATA_NAMESPACE = '@'; + public static final char QNAME_SEP_ENTITY_NAME = '.'; + public static final char QNAME_SEP_PROCESS = ':'; + public static final String TEMP_TABLE_PREFIX = "_temp-"; + public static final String CREATE_OPERATION = "CREATE"; + public static final String ALTER_OPERATION = "ALTER"; private final HiveHook hook; private final HiveOperation hiveOperation; @@ -157,8 +157,8 @@ public class AtlasHiveHookContext { public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); } - public String getClusterName() { - return hook.getClusterName(); + public String getMetadataNamespace() { + return hook.getMetadataNamespace(); } public String getHostName() { return hook.getHostName(); } @@ -192,7 +192,7 @@ public class AtlasHiveHookContext { } public String getQualifiedName(Database db) { - return (db.getName() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); + return (db.getName() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace(); } public String getQualifiedName(Table table) { @@ -206,7 +206,7 @@ public class AtlasHiveHookContext { } } - return (table.getDbName() + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); + return (table.getDbName() + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace(); } public boolean isKnownDatabase(String dbQualifiedName) { diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index ffa56ce..5b1f61b 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -55,7 +55,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { public enum PreprocessAction { NONE, IGNORE, PRUNE } public static final String CONF_PREFIX = "atlas.hook.hive."; - public static final String CONF_CLUSTER_NAME = "atlas.cluster.name"; public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase"; public static final String HOOK_NAME_CACHE_ENABLED = CONF_PREFIX + "name.cache.enabled"; public static final String HOOK_NAME_CACHE_DATABASE_COUNT = CONF_PREFIX + "name.cache.database.count"; @@ -66,13 +65,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { public static final String HOOK_HIVE_TABLE_IGNORE_PATTERN = CONF_PREFIX + "hive_table.ignore.pattern"; public static final String HOOK_HIVE_TABLE_PRUNE_PATTERN = CONF_PREFIX + "hive_table.prune.pattern"; public static final String HOOK_HIVE_TABLE_CACHE_SIZE = CONF_PREFIX + "hive_table.cache.size"; - - public static final String DEFAULT_CLUSTER_NAME = "primary"; public static final String DEFAULT_HOST_NAME = "localhost"; private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>(); - private static final String clusterName; private static final boolean convertHdfsPathToLowerCase; private static final boolean nameCacheEnabled; private static final int nameCacheDatabaseMaxCount; @@ -96,7 +92,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { OPERATION_MAP.put(hiveOperation.getOperationName(), hiveOperation); } - clusterName = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false); nameCacheEnabled = atlasProperties.getBoolean(HOOK_NAME_CACHE_ENABLED, true); nameCacheDatabaseMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000); @@ -253,10 +248,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } } - public String getClusterName() { - return clusterName; - } - public boolean isConvertHdfsPathToLowerCase() { return convertHdfsPathToLowerCase; } diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java index 0bf3ce2..5ce4c77 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java @@ -65,7 +65,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_CLUSTER_NAME; +import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_METADATA_NAMESPACE; import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_ENTITY_NAME; import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_PROCESS; @@ -353,7 +353,7 @@ public abstract class BaseHiveEvent { ret.setAttribute(ATTRIBUTE_DESCRIPTION, db.getDescription()); ret.setAttribute(ATTRIBUTE_OWNER, db.getOwnerName()); - ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName()); + ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getMetadataNamespace()); ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(db.getLocationUri())); ret.setAttribute(ATTRIBUTE_PARAMETERS, db.getParameters()); @@ -599,7 +599,8 @@ public abstract class BaseHiveEvent { protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo) { AtlasEntity ret; - String strPath = path.toString(); + String strPath = path.toString(); + String metadataNamespace = getMetadataNamespace(); if (strPath.startsWith(HDFS_PATH_PREFIX) && context.isConvertHdfsPathToLowerCase()) { strPath = strPath.toLowerCase(); @@ -607,8 +608,8 @@ public abstract class BaseHiveEvent { if (isS3Path(strPath)) { String bucketName = path.toUri().getAuthority(); - String bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); - String pathQualifiedName = (strPath + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); + String bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; + String pathQualifiedName = (strPath + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName); ret = context.getEntity(pathQualifiedName); @@ -657,7 +658,7 @@ public abstract class BaseHiveEvent { ret.setAttribute(ATTRIBUTE_PATH, attrPath); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); ret.setAttribute(ATTRIBUTE_NAME, name); - ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName()); + ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace); context.putEntity(pathQualifiedName, ret); } @@ -754,8 +755,8 @@ public abstract class BaseHiveEvent { return hiveDDL; } - protected String getClusterName() { - return context.getClusterName(); + protected String getMetadataNamespace() { + return context.getMetadataNamespace(); } protected Database getDatabases(String dbName) throws Exception { @@ -873,7 +874,7 @@ public abstract class BaseHiveEvent { protected String getQualifiedName(Table table, FieldSchema column) { String tblQualifiedName = getQualifiedName(table); - int sepPos = tblQualifiedName.lastIndexOf(QNAME_SEP_CLUSTER_NAME); + int sepPos = tblQualifiedName.lastIndexOf(QNAME_SEP_METADATA_NAMESPACE); if (sepPos == -1) { return tblQualifiedName + QNAME_SEP_ENTITY_NAME + column.getName().toLowerCase(); @@ -891,19 +892,20 @@ public abstract class BaseHiveEvent { } protected String getQualifiedName(BaseColumnInfo column) { - String dbName = column.getTabAlias().getTable().getDbName(); - String tableName = column.getTabAlias().getTable().getTableName(); - String colName = column.getColumn() != null ? column.getColumn().getName() : null; + String dbName = column.getTabAlias().getTable().getDbName(); + String tableName = column.getTabAlias().getTable().getTableName(); + String colName = column.getColumn() != null ? column.getColumn().getName() : null; + String metadataNamespace = getMetadataNamespace(); if (colName == null) { - return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); + return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; } else { - return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); + return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; } } protected String getQualifiedName(String dbName, String tableName, String colName) { - return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); + return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace(); } protected String getQualifiedName(URI location) { @@ -921,14 +923,14 @@ public abstract class BaseHiveEvent { protected String getQualifiedName(String path) { if (path.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) { - return path + QNAME_SEP_CLUSTER_NAME + getClusterName(); + return path + QNAME_SEP_METADATA_NAMESPACE + getMetadataNamespace(); } return path.toLowerCase(); } protected String getColumnQualifiedName(String tblQualifiedName, String columnName) { - int sepPos = tblQualifiedName.lastIndexOf(QNAME_SEP_CLUSTER_NAME); + int sepPos = tblQualifiedName.lastIndexOf(QNAME_SEP_METADATA_NAMESPACE); if (sepPos == -1) { return tblQualifiedName + QNAME_SEP_ENTITY_NAME + columnName.toLowerCase(); @@ -983,26 +985,27 @@ public abstract class BaseHiveEvent { } protected AtlasEntity toReferencedHBaseTable(Table table, AtlasEntitiesWithExtInfo entities) { - AtlasEntity ret = null; - HBaseTableInfo hBaseTableInfo = new HBaseTableInfo(table); - String hbaseNameSpace = hBaseTableInfo.getHbaseNameSpace(); - String hbaseTableName = hBaseTableInfo.getHbaseTableName(); + AtlasEntity ret = null; + HBaseTableInfo hBaseTableInfo = new HBaseTableInfo(table); + String hbaseNameSpace = hBaseTableInfo.getHbaseNameSpace(); + String hbaseTableName = hBaseTableInfo.getHbaseTableName(); + String metadataNamespace = getMetadataNamespace(); if (hbaseTableName != null) { AtlasEntity nsEntity = new AtlasEntity(HBASE_TYPE_NAMESPACE); nsEntity.setAttribute(ATTRIBUTE_NAME, hbaseNameSpace); - nsEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName()); - nsEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseNameSpaceQualifiedName(getClusterName(), hbaseNameSpace)); + nsEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace); + nsEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseNameSpaceQualifiedName(metadataNamespace, hbaseNameSpace)); ret = new AtlasEntity(HBASE_TYPE_TABLE); ret.setAttribute(ATTRIBUTE_NAME, hbaseTableName); ret.setAttribute(ATTRIBUTE_URI, hbaseTableName); - AtlasRelatedObjectId objIdRelatedObject = new AtlasRelatedObjectId(getObjectId(nsEntity), RELATIONSHIP_HBASE_TABLE_NAMESPACE); + AtlasRelatedObjectId objIdRelatedObject = new AtlasRelatedObjectId(getObjectId(nsEntity), RELATIONSHIP_HBASE_TABLE_NAMESPACE); ret.setRelationshipAttribute(ATTRIBUTE_NAMESPACE, objIdRelatedObject); - ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseTableQualifiedName(getClusterName(), hbaseNameSpace, hbaseTableName)); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseTableQualifiedName(metadataNamespace, hbaseNameSpace, hbaseTableName)); entities.addReferredEntity(nsEntity); entities.addEntity(ret); @@ -1024,12 +1027,12 @@ public abstract class BaseHiveEvent { return ret; } - private static String getHBaseTableQualifiedName(String clusterName, String nameSpace, String tableName) { - return String.format("%s:%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), clusterName); + private static String getHBaseTableQualifiedName(String metadataNamespace, String nameSpace, String tableName) { + return String.format("%s:%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), metadataNamespace); } - private static String getHBaseNameSpaceQualifiedName(String clusterName, String nameSpace) { - return String.format("%s@%s", nameSpace.toLowerCase(), clusterName); + private static String getHBaseNameSpaceQualifiedName(String metadataNamespace, String nameSpace) { + return String.format("%s@%s", nameSpace.toLowerCase(), metadataNamespace); } private boolean ignoreHDFSPathsinProcessQualifiedName() { diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java index cbee7bf..0736153 100644 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java @@ -519,7 +519,7 @@ public class HiveITBase { Table outTable = entity.getTable(); //refresh table outTable = dgiBridge.getHiveClient().getTable(outTable.getDbName(), outTable.getTableName()); - return HiveMetaStoreBridge.getTableProcessQualifiedName(dgiBridge.getClusterName(), outTable); + return HiveMetaStoreBridge.getTableProcessQualifiedName(dgiBridge.getMetadataNamespace(), outTable); } } diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java index d55aa53..4403aaf 100644 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java @@ -53,10 +53,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class HiveMetaStoreBridgeTest { - - private static final String TEST_DB_NAME = "default"; - public static final String CLUSTER_NAME = "primary"; - public static final String TEST_TABLE_NAME = "test_table"; + private static final String TEST_DB_NAME = "default"; + public static final String METADATA_NAMESPACE = "primary"; + public static final String TEST_TABLE_NAME = "test_table"; @Mock private Hive hiveClient; @@ -90,13 +89,13 @@ public class HiveMetaStoreBridgeTest { when(hiveClient.getDatabase(TEST_DB_NAME)).thenReturn(db); when(hiveClient.getAllTables(TEST_DB_NAME)).thenReturn(Arrays.asList(new String[]{})); - returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME); + returnExistingDatabase(TEST_DB_NAME, atlasClientV2, METADATA_NAMESPACE); when(atlasEntityWithExtInfo.getEntity("72e06b34-9151-4023-aa9d-b82103a50e76")) .thenReturn((new AtlasEntity.AtlasEntityWithExtInfo( getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76"))).getEntity()); - HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); + HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2); bridge.importHiveMetadata(null, null, true); // verify update is called @@ -109,7 +108,7 @@ public class HiveMetaStoreBridgeTest { List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME); - returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME); + returnExistingDatabase(TEST_DB_NAME, atlasClientV2, METADATA_NAMESPACE); // return existing table @@ -119,7 +118,7 @@ public class HiveMetaStoreBridgeTest { when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(), Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME)))) + HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME, TEST_TABLE_NAME)))) .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo( getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))); @@ -127,7 +126,7 @@ public class HiveMetaStoreBridgeTest { .thenReturn(createTableReference()); Table testTable = hiveTables.get(0); - String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable); + String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(METADATA_NAMESPACE, testTable); when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(), Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, @@ -136,7 +135,7 @@ public class HiveMetaStoreBridgeTest { getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))); - HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); + HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2); bridge.importHiveMetadata(null, null, true); // verify update is called on table @@ -144,13 +143,13 @@ public class HiveMetaStoreBridgeTest { } - private void returnExistingDatabase(String databaseName, AtlasClientV2 atlasClientV2, String clusterName) + private void returnExistingDatabase(String databaseName, AtlasClientV2 atlasClientV2, String metadataNamespace) throws AtlasServiceException { //getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76"); when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_DB.getName(), Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, TEST_DB_NAME)))) + HiveMetaStoreBridge.getDBQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME)))) .thenReturn((new AtlasEntity.AtlasEntityWithExtInfo( getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76")))); @@ -179,16 +178,16 @@ public class HiveMetaStoreBridgeTest { List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME); Table hiveTable = hiveTables.get(0); - returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME); + returnExistingDatabase(TEST_DB_NAME, atlasClientV2, METADATA_NAMESPACE); when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(), Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME)))) + HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME, TEST_TABLE_NAME)))) .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo( getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))); - String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTable); + String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(METADATA_NAMESPACE, hiveTable); when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(), Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, @@ -206,7 +205,7 @@ public class HiveMetaStoreBridgeTest { when(hiveClient.getPartitions(hiveTable)).thenReturn(Arrays.asList(new Partition[]{partition})); - HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); + HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2); try { bridge.importHiveMetadata(null, null, true); } catch (Exception e) { @@ -220,12 +219,12 @@ public class HiveMetaStoreBridgeTest { final String table2Name = TEST_TABLE_NAME + "_1"; List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME, table2Name); - returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME); + returnExistingDatabase(TEST_DB_NAME, atlasClientV2, METADATA_NAMESPACE); when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore")); when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(), Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME)))) + HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME, TEST_TABLE_NAME)))) .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo( getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))); @@ -233,7 +232,7 @@ public class HiveMetaStoreBridgeTest { .thenReturn(createTableReference()); Table testTable = hiveTables.get(1); - String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable); + String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(METADATA_NAMESPACE, testTable); when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(), Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, @@ -241,7 +240,7 @@ public class HiveMetaStoreBridgeTest { .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo( getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))); - HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); + HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2); try { bridge.importHiveMetadata(null, null, false); } catch (Exception e) { @@ -255,13 +254,13 @@ public class HiveMetaStoreBridgeTest { final String table2Name = TEST_TABLE_NAME + "_1"; List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME, table2Name); - returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME); + returnExistingDatabase(TEST_DB_NAME, atlasClientV2, METADATA_NAMESPACE); when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore")); when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(), Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME, TEST_TABLE_NAME)))) + HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME, TEST_TABLE_NAME)))) .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo( getEntity(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))); @@ -270,7 +269,7 @@ public class HiveMetaStoreBridgeTest { .thenReturn(createTableReference()); Table testTable = hiveTables.get(1); - String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable); + String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(METADATA_NAMESPACE, testTable); when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(), Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, @@ -278,7 +277,7 @@ public class HiveMetaStoreBridgeTest { .thenReturn(new AtlasEntity.AtlasEntityWithExtInfo( getEntity(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77"))); - HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); + HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2); try { bridge.importHiveMetadata(null, null, true); Assert.fail("Table registration is supposed to fail"); diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java index 1305f65..51b2f83 100644 --- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java +++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java @@ -33,9 +33,9 @@ import org.apache.commons.lang.StringUtils; * Contain the info related to an linear record from Impala */ public class AtlasImpalaHookContext { - public static final char QNAME_SEP_CLUSTER_NAME = '@'; - public static final char QNAME_SEP_ENTITY_NAME = '.'; - public static final char QNAME_SEP_PROCESS = ':'; + public static final char QNAME_SEP_METADATA_NAMESPACE = '@'; + public static final char QNAME_SEP_ENTITY_NAME = '.'; + public static final char QNAME_SEP_PROCESS = ':'; private final ImpalaLineageHook hook; private final ImpalaOperationType impalaOperation; @@ -69,8 +69,8 @@ public class AtlasImpalaHookContext { public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); } - public String getClusterName() { - return hook.getClusterName(); + public String getMetadataNamespace() { + return hook.getMetadataNamespace(); } public String getHostName() { @@ -82,7 +82,7 @@ public class AtlasImpalaHookContext { } public String getQualifiedNameForDb(String dbName) { - return (dbName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); + return (dbName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace(); } public String getQualifiedNameForTable(String fullTableName) throws IllegalArgumentException { @@ -100,8 +100,7 @@ public class AtlasImpalaHookContext { } public String getQualifiedNameForTable(String dbName, String tableName) { - return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + - getClusterName(); + return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace(); } public String getQualifiedNameForColumn(LineageVertex vertex) { @@ -179,7 +178,7 @@ public class AtlasImpalaHookContext { public String getQualifiedNameForColumn(String dbName, String tableName, String columnName) { return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME + - columnName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); + columnName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace(); } public String getUserName() { return lineageQuery.getUser(); } diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java index b5fdb6d..10ae08f 100644 --- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java +++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java @@ -18,7 +18,6 @@ package org.apache.atlas.impala.hook; -import static org.apache.atlas.AtlasConstants.DEFAULT_CLUSTER_NAME; import java.net.InetAddress; import java.net.UnknownHostException; import com.google.common.collect.Sets; @@ -42,20 +41,17 @@ public class ImpalaLineageHook extends AtlasHook { public static final String ATLAS_ENDPOINT = "atlas.rest.address"; public static final String REALM_SEPARATOR = "@"; public static final String CONF_PREFIX = "atlas.hook.impala."; - public static final String CONF_CLUSTER_NAME = "atlas.cluster.name"; public static final String CONF_REALM_NAME = "atlas.realm.name"; public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase"; public static final String DEFAULT_HOST_NAME = "localhost"; - private static final String clusterName; - private static final String realm; + private static final String realm; private static final boolean convertHdfsPathToLowerCase; - private static String hostName; + private static String hostName; static { - clusterName = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); - realm = atlasProperties.getString(CONF_REALM_NAME, DEFAULT_CLUSTER_NAME); // what should default be ?? - convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false); + realm = atlasProperties.getString(CONF_REALM_NAME, DEFAULT_CLUSTER_NAME); // what should default be ?? + convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false); try { hostName = InetAddress.getLocalHost().getHostName(); @@ -143,10 +139,6 @@ public class ImpalaLineageHook extends AtlasHook { return UserGroupInformation.getUGIFromSubject(userSubject); } - public String getClusterName() { - return clusterName; - } - public String getRealm() { return realm; } diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java index 4ea484f..5329e8b 100644 --- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java +++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java @@ -340,7 +340,7 @@ public abstract class BaseImpalaEvent { ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName); ret.setAttribute(ATTRIBUTE_NAME, dbName.toLowerCase()); - ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, context.getClusterName()); + ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, context.getMetadataNamespace()); context.putEntity(dbQualifiedName, ret); } diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java index f1d0237..600ca5e 100644 --- a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java +++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java @@ -293,7 +293,7 @@ public class ImpalaLineageITBase { protected String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception { LOG.debug("Searching for database: {}", dbName); - String dbQualifiedName = dbName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + + String dbQualifiedName = dbName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE + CLUSTER_NAME; dbQualifiedName = dbQualifiedName.toLowerCase(); @@ -320,7 +320,7 @@ public class ImpalaLineageITBase { protected String assertTableIsRegistered(String fullTableName, AssertPredicate assertPredicate, boolean isTemporary) throws Exception { LOG.debug("Searching for table {}", fullTableName); - String tableQualifiedName = (fullTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME).toLowerCase() + + String tableQualifiedName = (fullTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + CLUSTER_NAME; return assertEntityIsRegistered(HIVE_TYPE_TABLE, REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName, diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java index 8ebb385..25e26e8 100644 --- a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java +++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java @@ -77,7 +77,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { // the value is from info in IMPALA_3 String createTime = new Long((long)(1554750072)*1000).toString(); String processQFName = - "db_1.view_1" + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + + "db_1.view_1" + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE + CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime; processQFName = processQFName.toLowerCase(); @@ -140,7 +140,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { Long afterCreateTime = System.currentTimeMillis() / BaseImpalaEvent.MILLIS_CONVERT_FACTOR; String processQFNameWithoutTime = - dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + + dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE + CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS; processQFNameWithoutTime = processQFNameWithoutTime.toLowerCase(); @@ -210,7 +210,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { // the value is from info in IMPALA_4. String createTime = new Long(TABLE_CREATE_TIME*1000).toString(); String processQFName = - dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + + dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE + CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime; processQFName = processQFName.toLowerCase(); @@ -266,7 +266,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { // the value is from info in IMPALA_4. String createTime = new Long(TABLE_CREATE_TIME*1000).toString(); String processQFName = - dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + + dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE + CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime; processQFName = processQFName.toLowerCase(); @@ -322,9 +322,9 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { // the value is from info in IMPALA_4. String createTime1 = new Long(TABLE_CREATE_TIME_SOURCE*1000).toString(); String createTime2 = new Long(TABLE_CREATE_TIME*1000).toString(); - String sourceQFName = dbName + "." + sourceTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + + String sourceQFName = dbName + "." + sourceTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE + CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime1; - String targetQFName = dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + + String targetQFName = dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE + CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime2; String processQFName = "QUERY:" + sourceQFName.toLowerCase() + "->:INSERT:" + targetQFName.toLowerCase(); @@ -385,9 +385,9 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { // the value is from info in IMPALA_4. String createTime1 = new Long(TABLE_CREATE_TIME_SOURCE*1000).toString(); String createTime2 = new Long(TABLE_CREATE_TIME*1000).toString(); - String sourceQFName = dbName + "." + sourceTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + + String sourceQFName = dbName + "." + sourceTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE + CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime1; - String targetQFName = dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + + String targetQFName = dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE + CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime2; String processQFName = "QUERY:" + sourceQFName.toLowerCase() + "->:INSERT:" + targetQFName.toLowerCase(); @@ -454,7 +454,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { // the value is from info in IMPALA_4. String createTime = new Long((long)1560885039*1000).toString(); String processQFName = - dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + + dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE + CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime; processQFName = processQFName.toLowerCase(); diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java index a7b9b0c..56d74fe 100644 --- a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java +++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java @@ -137,7 +137,7 @@ public class ImpalaLineageHookIT extends ImpalaLineageITBase { impalaHook.process(queryObj); String createTime = new Long(BaseImpalaEvent.getTableCreateTime(vertex5)).toString(); String processQFName = - vertex5.getVertexId() + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + + vertex5.getVertexId() + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE + CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime; processQFName = processQFName.toLowerCase(); diff --git a/addons/kafka-bridge/src/bin/import-kafka.sh b/addons/kafka-bridge/src/bin/import-kafka.sh index 4bfb8b9..076eae8 100644 --- a/addons/kafka-bridge/src/bin/import-kafka.sh +++ b/addons/kafka-bridge/src/bin/import-kafka.sh @@ -117,7 +117,7 @@ else exit 1 fi -CP="${KAFKA_CP}:${ATLASCPPATH}:${HADOOP_CP}" +CP="${ATLASCPPATH}:${HADOOP_CP}:${KAFKA_CP}" # If running in cygwin, convert pathnames and classpath to Windows format. if [ "${CYGWIN}" == "true" ] diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java index 8755c9e..40b1fee 100644 --- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java +++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java @@ -58,19 +58,20 @@ import java.util.regex.Pattern; public class KafkaBridge { private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class); - private static final int EXIT_CODE_SUCCESS = 0; - private static final int EXIT_CODE_FAILED = 1; - private static final String ATLAS_ENDPOINT = "atlas.rest.address"; - private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; - private static final String KAFKA_CLUSTER_NAME = "atlas.cluster.name"; - private static final String DEFAULT_CLUSTER_NAME = "primary"; - private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; - private static final String DESCRIPTION_ATTR = "description"; - private static final String PARTITION_COUNT = "partitionCount"; - private static final String NAME = "name"; - private static final String URI = "uri"; - private static final String CLUSTERNAME = "clusterName"; - private static final String TOPIC = "topic"; + private static final int EXIT_CODE_SUCCESS = 0; + private static final int EXIT_CODE_FAILED = 1; + private static final String ATLAS_ENDPOINT = "atlas.rest.address"; + private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; + private static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; + private static final String KAFKA_METADATA_NAMESPACE = "atlas.metadata.namespace"; + private static final String DEFAULT_CLUSTER_NAME = "primary"; + private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + private static final String DESCRIPTION_ATTR = "description"; + private static final String PARTITION_COUNT = "partitionCount"; + private static final String NAME = "name"; + private static final String URI = "uri"; + private static final String CLUSTERNAME = "clusterName"; + private static final String TOPIC = "topic"; private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s"; private static final String ZOOKEEPER_CONNECT = "atlas.kafka.zookeeper.connect"; @@ -81,7 +82,7 @@ public class KafkaBridge { private static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10 * 1000; private final List<String> availableTopics; - private final String clusterName; + private final String metadataNamespace; private final AtlasClientV2 atlasClientV2; private final ZkUtils zkUtils; @@ -163,10 +164,18 @@ public class KafkaBridge { int connectionTimeOutMs = atlasConf.getInt(ZOOKEEPER_CONNECTION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS); ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeOutMs, connectionTimeOutMs, ZKStringSerializer$.MODULE$); - this.atlasClientV2 = atlasClientV2; - this.clusterName = atlasConf.getString(KAFKA_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); - this.zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled()); - this.availableTopics = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()); + this.atlasClientV2 = atlasClientV2; + this.metadataNamespace = getMetadataNamespace(atlasConf); + this.zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled()); + this.availableTopics = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()); + } + + private String getMetadataNamespace(Configuration config) { + return config.getString(KAFKA_METADATA_NAMESPACE, getClusterName(config)); + } + + private String getClusterName(Configuration config) { + return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME); } public void importTopic(String topicToImport) throws Exception { @@ -191,7 +200,7 @@ public class KafkaBridge { @VisibleForTesting AtlasEntityWithExtInfo createOrUpdateTopic(String topic) throws Exception { - String topicQualifiedName = getTopicQualifiedName(clusterName, topic); + String topicQualifiedName = getTopicQualifiedName(metadataNamespace, topic); AtlasEntityWithExtInfo topicEntity = findTopicEntityInAtlas(topicQualifiedName); if (topicEntity == null) { @@ -225,10 +234,10 @@ public class KafkaBridge { ret = topicEntity; } - String qualifiedName = getTopicQualifiedName(clusterName, topic); + String qualifiedName = getTopicQualifiedName(metadataNamespace, topic); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName); - ret.setAttribute(CLUSTERNAME, clusterName); + ret.setAttribute(CLUSTERNAME, metadataNamespace); ret.setAttribute(TOPIC, topic); ret.setAttribute(NAME,topic); ret.setAttribute(DESCRIPTION_ATTR, topic); @@ -239,8 +248,8 @@ public class KafkaBridge { } @VisibleForTesting - static String getTopicQualifiedName(String clusterName, String topic) { - return String.format(FORMAT_KAKFA_TOPIC_QUALIFIED_NAME, topic.toLowerCase(), clusterName); + static String getTopicQualifiedName(String metadataNamespace, String topic) { + return String.format(FORMAT_KAKFA_TOPIC_QUALIFIED_NAME, topic.toLowerCase(), metadataNamespace); } private AtlasEntityWithExtInfo findTopicEntityInAtlas(String topicQualifiedName) { diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java index 5397a4b..3ccd426 100644 --- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java +++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java @@ -52,8 +52,9 @@ import java.util.Date; public class SqoopHook extends SqoopJobDataPublisher { private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class); - public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name"; - public static final String DEFAULT_CLUSTER_NAME = "primary"; + public static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; + public static final String ATLAS_METADATA_NAMESPACE = "atlas.metadata.namespace"; + public static final String DEFAULT_CLUSTER_NAME = "primary"; public static final String USER = "userName"; public static final String DB_STORE_TYPE = "dbStoreType"; @@ -80,12 +81,14 @@ public class SqoopHook extends SqoopJobDataPublisher { @Override public void publish(SqoopJobDataPublisher.Data data) throws AtlasHookException { try { - Configuration atlasProperties = ApplicationProperties.get(); - String clusterName = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); - AtlasEntity entDbStore = toSqoopDBStoreEntity(data); - AtlasEntity entHiveDb = toHiveDatabaseEntity(clusterName, data.getHiveDB()); - AtlasEntity entHiveTable = data.getHiveTable() != null ? toHiveTableEntity(entHiveDb, data.getHiveTable()) : null; - AtlasEntity entProcess = toSqoopProcessEntity(entDbStore, entHiveDb, entHiveTable, data, clusterName); + Configuration atlasProperties = ApplicationProperties.get(); + String metadataNamespace = atlasProperties.getString(ATLAS_METADATA_NAMESPACE, getClusterName(atlasProperties)); + + AtlasEntity entDbStore = toSqoopDBStoreEntity(data); + AtlasEntity entHiveDb = toHiveDatabaseEntity(metadataNamespace, data.getHiveDB()); + AtlasEntity entHiveTable = data.getHiveTable() != null ? toHiveTableEntity(entHiveDb, data.getHiveTable()) : null; + AtlasEntity entProcess = toSqoopProcessEntity(entDbStore, entHiveDb, entHiveTable, data, metadataNamespace); + AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(entProcess); @@ -105,11 +108,15 @@ public class SqoopHook extends SqoopJobDataPublisher { } } - private AtlasEntity toHiveDatabaseEntity(String clusterName, String dbName) { + private String getClusterName(Configuration config) { + return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME); + } + + private AtlasEntity toHiveDatabaseEntity(String metadataNamespace, String dbName) { AtlasEntity entHiveDb = new AtlasEntity(HiveDataTypes.HIVE_DB.getName()); - String qualifiedName = HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName); + String qualifiedName = HiveMetaStoreBridge.getDBQualifiedName(metadataNamespace, dbName); - entHiveDb.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); + entHiveDb.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, metadataNamespace); entHiveDb.setAttribute(AtlasClient.NAME, dbName); entHiveDb.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName); @@ -153,9 +160,10 @@ public class SqoopHook extends SqoopJobDataPublisher { return entDbStore; } - private AtlasEntity toSqoopProcessEntity(AtlasEntity entDbStore, AtlasEntity entHiveDb, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) { + private AtlasEntity toSqoopProcessEntity(AtlasEntity entDbStore, AtlasEntity entHiveDb, AtlasEntity entHiveTable, + SqoopJobDataPublisher.Data data, String metadataNamespace) { AtlasEntity entProcess = new AtlasEntity(SqoopDataTypes.SQOOP_PROCESS.getName()); - String sqoopProcessName = getSqoopProcessName(data, clusterName); + String sqoopProcessName = getSqoopProcessName(data, metadataNamespace); Map<String, String> sqoopOptionsMap = new HashMap<>(); Properties options = data.getOptions(); @@ -190,7 +198,7 @@ public class SqoopHook extends SqoopJobDataPublisher { return data.getOperation().toLowerCase().equals("import"); } - static String getSqoopProcessName(Data data, String clusterName) { + static String getSqoopProcessName(Data data, String metadataNamespace) { StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(), data.getUrl())); if (StringUtils.isNotEmpty(data.getHiveTable())) { @@ -204,9 +212,9 @@ public class SqoopHook extends SqoopJobDataPublisher { } if (data.getHiveTable() != null) { - name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName)); + name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), metadataNamespace)); } else { - name.append(String.format("--hive-%s --hive-database %s --hive-cluster %s", data.getOperation(), data.getHiveDB(), clusterName)); + name.append(String.format("--hive-%s --hive-database %s --hive-cluster %s", data.getOperation(), data.getHiveDB(), metadataNamespace)); } return name.toString(); diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java index 97668a3..517a3c3 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -118,7 +118,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { topology.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, topologyInfo.get_name()); topology.setAttribute(AtlasClient.OWNER, owner); topology.setAttribute("startTime", new Date(System.currentTimeMillis())); - topology.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); + topology.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getMetadataNamespace()); return topology; } @@ -166,9 +166,9 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { } private AtlasEntity addDataSet(String dataSetType, String topologyOwner, Serializable instance, Map stormConf, AtlasEntityExtInfo entityExtInfo) { - Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true, null); - String clusterName = null; - AtlasEntity ret = null; + Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true, null); + AtlasEntity ret = null; + String metadataNamespace = getMetadataNamespace(); // todo: need to redo this with a config driven approach switch (dataSetType) { @@ -188,8 +188,6 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { topologyOwner = ANONYMOUS_OWNER; } - clusterName = getClusterName(stormConf); - if (topicName == null) { LOG.error("Kafka topic name not found"); } else { @@ -198,7 +196,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ret.setAttribute("topic", topicName); ret.setAttribute("uri", uri); ret.setAttribute(AtlasClient.OWNER, topologyOwner); - ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(clusterName, topicName)); + ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(metadataNamespace, topicName)); ret.setAttribute(AtlasClient.NAME, topicName); } } @@ -212,7 +210,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { uri = hbaseTableName; } - clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf); + metadataNamespace = extractComponentMetadataNamespace(HBaseConfiguration.create(), stormConf); if (hbaseTableName == null) { LOG.error("HBase table name not found"); @@ -223,7 +221,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ret.setAttribute(AtlasClient.NAME, uri); ret.setAttribute(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal")); //TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName - ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, hbaseTableName)); + ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(metadataNamespace, HBASE_NAMESPACE_DEFAULT, hbaseTableName)); } } break; @@ -234,11 +232,9 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { final Path hdfsPath = new Path(hdfsPathStr); final String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(hdfsPathStr); - clusterName = getClusterName(stormConf); - ret = new AtlasEntity(HiveMetaStoreBridge.HDFS_PATH); - ret.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); + ret.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, metadataNamespace); ret.setAttribute(AtlasClient.OWNER, stormConf.get("hdfs.kerberos.principal")); ret.setAttribute(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(hdfsPath).toString().toLowerCase()); @@ -247,16 +243,16 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ret.setAttribute("path", updatedPath); ret.setAttribute("nameServiceId", nameServiceID); - ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, updatedPath)); + ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(metadataNamespace, updatedPath)); } else { ret.setAttribute("path", hdfsPathStr); - ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, hdfsPathStr)); + ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(metadataNamespace, hdfsPathStr)); } } break; case "HiveBolt": { - clusterName = extractComponentClusterName(new HiveConf(), stormConf); + metadataNamespace = extractComponentMetadataNamespace(new HiveConf(), stormConf); final String dbName = config.get("HiveBolt.options.databaseName"); final String tblName = config.get("HiveBolt.options.tableName"); @@ -267,8 +263,8 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { AtlasEntity dbEntity = new AtlasEntity("hive_db"); dbEntity.setAttribute(AtlasClient.NAME, dbName); - dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), dbName)); - dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); + dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(metadataNamespace, dbName)); + dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, metadataNamespace); entityExtInfo.addReferredEntity(dbEntity); @@ -277,7 +273,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ret.setAttribute(AtlasClient.NAME, tblName); ret.setAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasObjectId(dbEntity)); - ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tblName)); + ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(metadataNamespace, dbName, tblName)); } } break; @@ -384,30 +380,25 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { } } - public static String getKafkaTopicQualifiedName(String clusterName, String topicName) { - return String.format("%s@%s", topicName.toLowerCase(), clusterName); + public static String getKafkaTopicQualifiedName(String metadataNamespace, String topicName) { + return String.format("%s@%s", topicName.toLowerCase(), metadataNamespace); } - public static String getHbaseTableQualifiedName(String clusterName, String nameSpace, String tableName) { - return String.format("%s.%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), clusterName); + public static String getHbaseTableQualifiedName(String metadataNamespace, String nameSpace, String tableName) { + return String.format("%s.%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), metadataNamespace); } - public static String getHdfsPathQualifiedName(String clusterName, String hdfsPath) { - return String.format("%s@%s", hdfsPath.toLowerCase(), clusterName); + public static String getHdfsPathQualifiedName(String metadataNamespace, String hdfsPath) { + return String.format("%s@%s", hdfsPath.toLowerCase(), metadataNamespace); } - private String getClusterName(Map stormConf) { - return atlasProperties.getString(AtlasConstants.CLUSTER_NAME_KEY, AtlasConstants.DEFAULT_CLUSTER_NAME); - } - - private String extractComponentClusterName(Configuration configuration, Map stormConf) { - String clusterName = configuration.get(AtlasConstants.CLUSTER_NAME_KEY, null); + private String extractComponentMetadataNamespace(Configuration configuration, Map stormConf) { + String clusterName = configuration.get(CLUSTER_NAME_KEY, null); if (clusterName == null) { - clusterName = getClusterName(stormConf); + clusterName = getMetadataNamespace(); } return clusterName; } - -} +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/atlas/AtlasConstants.java b/common/src/main/java/org/apache/atlas/AtlasConstants.java index 2b9f411..2b7b4b3 100644 --- a/common/src/main/java/org/apache/atlas/AtlasConstants.java +++ b/common/src/main/java/org/apache/atlas/AtlasConstants.java @@ -25,16 +25,16 @@ public final class AtlasConstants { private AtlasConstants() { } - public static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; - public static final String DEFAULT_CLUSTER_NAME = "primary"; - public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName"; - public static final String SYSTEM_PROPERTY_APP_PORT = "atlas.app.port"; - public static final String DEFAULT_APP_PORT_STR = "21000"; - public static final String ATLAS_REST_ADDRESS_KEY = "atlas.rest.address"; - public static final String DEFAULT_ATLAS_REST_ADDRESS = "http://localhost:21000"; - public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30; - public static final String DEFAULT_TYPE_VERSION = "1.0"; - + public static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; + public static final String SYSTEM_PROPERTY_APP_PORT = "atlas.app.port"; + public static final String ATLAS_REST_ADDRESS_KEY = "atlas.rest.address"; public static final String ATLAS_MIGRATION_MODE_FILENAME = "atlas.migration.data.filename"; public static final String ATLAS_SERVICES_ENABLED = "atlas.services.enabled"; + + public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName"; + public static final String DEFAULT_APP_PORT_STR = "21000"; + public static final String DEFAULT_ATLAS_REST_ADDRESS = "http://localhost:21000"; + public static final String DEFAULT_TYPE_VERSION = "1.0"; + public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30; + } diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java index 0030276..cc6546b 100644 --- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -59,10 +59,14 @@ public abstract class AtlasHook { public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY = "atlas.notification.failed.messages.filename"; public static final String ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY = "atlas.notification.log.failed.messages"; public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME = "atlas_hook_failed_messages.log"; + public static final String CONF_METADATA_NAMESPACE = "atlas.metadata.namespace"; + public static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; + public static final String DEFAULT_CLUSTER_NAME = "primary"; protected static Configuration atlasProperties; protected static NotificationInterface notificationInterface; + private static final String metadataNamespace; private static final int SHUTDOWN_HOOK_WAIT_TIME_MS = 3000; private static final boolean logFailedMessages; private static final FailedMessagesLogger failedMessagesLogger; @@ -95,6 +99,7 @@ public abstract class AtlasHook { } } + metadataNamespace = getMetadataNamespace(atlasProperties); notificationMaxRetries = atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3); notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000); notificationInterface = NotificationProvider.get(); @@ -306,4 +311,15 @@ public abstract class AtlasHook { return ret; } -} + private static String getMetadataNamespace(Configuration config) { + return config.getString(CONF_METADATA_NAMESPACE, getClusterName(config)); + } + + private static String getClusterName(Configuration config) { + return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME); + } + + public String getMetadataNamespace() { + return metadataNamespace; + } +} \ No newline at end of file
