This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new ef40251 ATLAS-3321: Introduce atlas metadata namespace
ef40251 is described below
commit ef4025160dd456f3ac67ef39551ba280925a6a94
Author: Sarath Subramanian <[email protected]>
AuthorDate: Thu Jul 11 16:41:58 2019 -0700
ATLAS-3321: Introduce atlas metadata namespace
---
addons/hbase-bridge/src/bin/import-hbase.sh | 2 +-
.../apache/atlas/hbase/bridge/HBaseAtlasHook.java | 75 +++++++---------
.../org/apache/atlas/hbase/bridge/HBaseBridge.java | 85 ++++++++++--------
addons/hive-bridge/src/bin/import-hive.sh | 2 +-
.../atlas/hive/bridge/HiveMetaStoreBridge.java | 100 ++++++++++++---------
.../atlas/hive/hook/AtlasHiveHookContext.java | 20 ++---
.../java/org/apache/atlas/hive/hook/HiveHook.java | 9 --
.../atlas/hive/hook/events/BaseHiveEvent.java | 61 +++++++------
.../java/org/apache/atlas/hive/HiveITBase.java | 2 +-
.../atlas/hive/bridge/HiveMetaStoreBridgeTest.java | 47 +++++-----
.../atlas/impala/hook/AtlasImpalaHookContext.java | 17 ++--
.../atlas/impala/hook/ImpalaLineageHook.java | 16 +---
.../atlas/impala/hook/events/BaseImpalaEvent.java | 2 +-
.../apache/atlas/impala/ImpalaLineageITBase.java | 4 +-
.../apache/atlas/impala/ImpalaLineageToolIT.java | 18 ++--
.../atlas/impala/hook/ImpalaLineageHookIT.java | 2 +-
addons/kafka-bridge/src/bin/import-kafka.sh | 2 +-
.../org/apache/atlas/kafka/bridge/KafkaBridge.java | 55 +++++++-----
.../org/apache/atlas/sqoop/hook/SqoopHook.java | 40 +++++----
.../apache/atlas/storm/hook/StormAtlasHook.java | 57 +++++-------
.../main/java/org/apache/atlas/AtlasConstants.java | 20 ++---
.../main/java/org/apache/atlas/hook/AtlasHook.java | 18 +++-
22 files changed, 336 insertions(+), 318 deletions(-)
diff --git a/addons/hbase-bridge/src/bin/import-hbase.sh
b/addons/hbase-bridge/src/bin/import-hbase.sh
index b9c1a4b..34c4180 100644
--- a/addons/hbase-bridge/src/bin/import-hbase.sh
+++ b/addons/hbase-bridge/src/bin/import-hbase.sh
@@ -114,7 +114,7 @@ else
exit 1
fi
-CP="${ATLASCPPATH}:${HBASE_CP}:${HADOOP_CP}"
+CP="${HBASE_CP}:${HADOOP_CP}:${ATLASCPPATH}"
# If running in cygwin, convert pathnames and classpath to Windows format.
if [ "${CYGWIN}" == "true" ]
diff --git
a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
index 1825cd2..6d062e2 100644
---
a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
+++
b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseAtlasHook.java
@@ -30,7 +30,6 @@ import
org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV
import
org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -55,20 +54,17 @@ public class HBaseAtlasHook extends AtlasHook {
private static final Logger LOG =
LoggerFactory.getLogger(HBaseAtlasHook.class);
- public static final String HBASE_CLUSTER_NAME = "atlas.cluster.name";
- public static final String DEFAULT_CLUSTER_NAME = "primary";
- public static final String ATTR_DESCRIPTION = "description";
- public static final String ATTR_ATLAS_ENDPOINT = "atlas.rest.address";
- public static final String ATTR_COMMENT = "comment";
- public static final String ATTR_PARAMETERS = "parameters";
- public static final String ATTR_URI = "uri";
- public static final String ATTR_NAMESPACE = "namespace";
- public static final String ATTR_TABLE = "table";
- public static final String ATTR_COLUMNFAMILIES = "column_families";
- public static final String ATTR_CREATE_TIME = "createTime";
- public static final String ATTR_MODIFIED_TIME = "modifiedTime";
- public static final String ATTR_OWNER = "owner";
- public static final String ATTR_NAME = "name";
+ public static final String ATTR_DESCRIPTION = "description";
+ public static final String ATTR_ATLAS_ENDPOINT =
"atlas.rest.address";
+ public static final String ATTR_PARAMETERS = "parameters";
+ public static final String ATTR_URI = "uri";
+ public static final String ATTR_NAMESPACE = "namespace";
+ public static final String ATTR_TABLE = "table";
+ public static final String ATTR_COLUMNFAMILIES = "column_families";
+ public static final String ATTR_CREATE_TIME = "createTime";
+ public static final String ATTR_MODIFIED_TIME = "modifiedTime";
+ public static final String ATTR_OWNER = "owner";
+ public static final String ATTR_NAME = "name";
// column addition metadata
public static final String ATTR_TABLE_MAX_FILESIZE =
"maxFileSize";
@@ -106,7 +102,6 @@ public class HBaseAtlasHook extends AtlasHook {
public static final String HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT =
"%s:%s.%s@%s";
private static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName";
- private String clusterName = null;
private static volatile HBaseAtlasHook me;
@@ -141,7 +136,7 @@ public class HBaseAtlasHook extends AtlasHook {
ret = me;
if (ret == null) {
- me = ret = new HBaseAtlasHook(atlasProperties);
+ me = ret = new HBaseAtlasHook();
}
}
} catch (Exception e) {
@@ -152,15 +147,9 @@ public class HBaseAtlasHook extends AtlasHook {
return ret;
}
- public HBaseAtlasHook(Configuration atlasProperties) {
- this(atlasProperties.getString(HBASE_CLUSTER_NAME,
DEFAULT_CLUSTER_NAME));
+ public HBaseAtlasHook() {
}
- public HBaseAtlasHook(String clusterName) {
- this.clusterName = clusterName;
- }
-
-
public void createAtlasInstances(HBaseOperationContext
hbaseOperationContext) {
OPERATION operation = hbaseOperationContext.getOperation();
@@ -210,7 +199,7 @@ public class HBaseAtlasHook extends AtlasHook {
}
private void deleteNameSpaceInstance(HBaseOperationContext
hbaseOperationContext) {
- String nameSpaceQName = getNameSpaceQualifiedName(clusterName,
hbaseOperationContext.getNameSpace());
+ String nameSpaceQName =
getNameSpaceQualifiedName(getMetadataNamespace(),
hbaseOperationContext.getNameSpace());
AtlasObjectId nameSpaceId = new
AtlasObjectId(HBaseDataTypes.HBASE_NAMESPACE.getName(),
REFERENCEABLE_ATTRIBUTE_NAME, nameSpaceQName);
LOG.info("Delete NameSpace {}", nameSpaceQName);
@@ -259,7 +248,7 @@ public class HBaseAtlasHook extends AtlasHook {
}
String tableNameStr = tableName.getNameAsString();
- String tableQName = getTableQualifiedName(clusterName,
nameSpaceName, tableNameStr);
+ String tableQName =
getTableQualifiedName(getMetadataNamespace(), nameSpaceName, tableNameStr);
AtlasObjectId tableId = new
AtlasObjectId(HBaseDataTypes.HBASE_TABLE.getName(),
REFERENCEABLE_ATTRIBUTE_NAME, tableQName);
LOG.info("Delete Table {}", tableQName);
@@ -302,7 +291,7 @@ public class HBaseAtlasHook extends AtlasHook {
String tableNameStr = tableName.getNameAsString();
String columnFamilyName =
hbaseOperationContext.getColummFamily();
- String columnFamilyQName =
getColumnFamilyQualifiedName(clusterName, nameSpaceName, tableNameStr,
columnFamilyName);
+ String columnFamilyQName =
getColumnFamilyQualifiedName(getMetadataNamespace(), nameSpaceName,
tableNameStr, columnFamilyName);
AtlasObjectId columnFamilyId = new
AtlasObjectId(HBaseDataTypes.HBASE_COLUMN_FAMILY.getName(),
REFERENCEABLE_ATTRIBUTE_NAME, columnFamilyQName);
LOG.info("Delete ColumnFamily {}", columnFamilyQName);
@@ -314,48 +303,48 @@ public class HBaseAtlasHook extends AtlasHook {
/**
* Construct the qualified name used to uniquely identify a ColumnFamily
instance in Atlas.
*
- * @param clusterName Name of the cluster to which the HBase component
belongs
+ * @param metadataNamespace Metadata namespace of the cluster to which
the HBase component belongs
* @param nameSpace Name of the HBase database to which the Table
belongs
* @param tableName Name of the HBase table
* @param columnFamily Name of the ColumnFamily
* @return Unique qualified name to identify the Table instance in Atlas.
*/
- public static String getColumnFamilyQualifiedName(String clusterName,
String nameSpace, String tableName, String columnFamily) {
- if (clusterName == null || nameSpace == null || tableName == null ||
columnFamily == null) {
+ public static String getColumnFamilyQualifiedName(String
metadataNamespace, String nameSpace, String tableName, String columnFamily) {
+ if (metadataNamespace == null || nameSpace == null || tableName ==
null || columnFamily == null) {
return null;
} else {
- return String.format(HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT,
nameSpace.toLowerCase(), stripNameSpace(tableName.toLowerCase()),
columnFamily.toLowerCase(), clusterName);
+ return String.format(HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT,
nameSpace.toLowerCase(), stripNameSpace(tableName.toLowerCase()),
columnFamily.toLowerCase(), metadataNamespace);
}
}
/**
* Construct the qualified name used to uniquely identify a Table instance
in Atlas.
*
- * @param clusterName Name of the cluster to which the HBase component
belongs
+ * @param metadataNamespace Metadata namespace of the cluster to which
the HBase component belongs
* @param nameSpace Name of the HBase database to which the Table belongs
* @param tableName Name of the HBase table
* @return Unique qualified name to identify the Table instance in Atlas.
*/
- public static String getTableQualifiedName(String clusterName, String
nameSpace, String tableName) {
- if (clusterName == null || nameSpace == null || tableName == null) {
+ public static String getTableQualifiedName(String metadataNamespace,
String nameSpace, String tableName) {
+ if (metadataNamespace == null || nameSpace == null || tableName ==
null) {
return null;
} else {
- return String.format(HBASE_TABLE_QUALIFIED_NAME_FORMAT,
nameSpace.toLowerCase(), stripNameSpace(tableName.toLowerCase()), clusterName);
+ return String.format(HBASE_TABLE_QUALIFIED_NAME_FORMAT,
nameSpace.toLowerCase(), stripNameSpace(tableName.toLowerCase()),
metadataNamespace);
}
}
/**
* Construct the qualified name used to uniquely identify a HBase
NameSpace instance in Atlas.
*
- * @param clusterName Name of the cluster to which the HBase component
belongs
+ * @param metadataNamespace Metadata namespace of the cluster to which
the HBase component belongs
* @param nameSpace
* @return Unique qualified name to identify the HBase NameSpace instance
in Atlas.
*/
- public static String getNameSpaceQualifiedName(String clusterName, String
nameSpace) {
- if (clusterName == null || nameSpace == null) {
+ public static String getNameSpaceQualifiedName(String metadataNamespace,
String nameSpace) {
+ if (metadataNamespace == null || nameSpace == null) {
return null;
} else {
- return String.format(HBASE_NAMESPACE_QUALIFIED_NAME,
nameSpace.toLowerCase(), clusterName);
+ return String.format(HBASE_NAMESPACE_QUALIFIED_NAME,
nameSpace.toLowerCase(), metadataNamespace);
}
}
@@ -375,8 +364,8 @@ public class HBaseAtlasHook extends AtlasHook {
Date now = new Date(System.currentTimeMillis());
nameSpace.setAttribute(ATTR_NAME, nameSpaceName);
- nameSpace.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME,
getNameSpaceQualifiedName(clusterName, nameSpaceName));
- nameSpace.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE,
clusterName);
+ nameSpace.setAttribute(REFERENCEABLE_ATTRIBUTE_NAME,
getNameSpaceQualifiedName(getMetadataNamespace(), nameSpaceName));
+ nameSpace.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE,
getMetadataNamespace());
nameSpace.setAttribute(ATTR_DESCRIPTION, nameSpaceName);
nameSpace.setAttribute(ATTR_PARAMETERS,
hbaseOperationContext.getHbaseConf());
nameSpace.setAttribute(ATTR_OWNER, hbaseOperationContext.getOwner());
@@ -393,7 +382,7 @@ public class HBaseAtlasHook extends AtlasHook {
AtlasEntity table = new
AtlasEntity(HBaseDataTypes.HBASE_TABLE.getName());
String tableName = getTableName(hbaseOperationContext);
String nameSpaceName = (String) nameSpace.getAttribute(ATTR_NAME);
- String tableQName = getTableQualifiedName(clusterName,
nameSpaceName, tableName);
+ String tableQName =
getTableQualifiedName(getMetadataNamespace(), nameSpaceName, tableName);
OPERATION operation = hbaseOperationContext.getOperation();
Date now = new Date(System.currentTimeMillis());
@@ -455,7 +444,7 @@ public class HBaseAtlasHook extends AtlasHook {
String columnFamilyName =
columnFamilyDescriptor.getNameAsString();
String tableName = (String) table.getAttribute(ATTR_NAME);
String nameSpaceName = (String)
nameSpace.getAttribute(ATTR_NAME);
- String columnFamilyQName =
getColumnFamilyQualifiedName(clusterName, nameSpaceName, tableName,
columnFamilyName);
+ String columnFamilyQName =
getColumnFamilyQualifiedName(getMetadataNamespace(), nameSpaceName, tableName,
columnFamilyName);
Date now = new Date(System.currentTimeMillis());
columnFamily.setAttribute(ATTR_NAME, columnFamilyName);
diff --git
a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java
b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java
index 1765c18..4a4b4d9 100644
---
a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java
+++
b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java
@@ -65,21 +65,22 @@ import java.util.regex.Pattern;
public class HBaseBridge {
private static final Logger LOG =
LoggerFactory.getLogger(HBaseBridge.class);
- private static final int EXIT_CODE_SUCCESS = 0;
- private static final int EXIT_CODE_FAILED = 1;
- private static final String ATLAS_ENDPOINT = "atlas.rest.address";
- private static final String DEFAULT_ATLAS_URL =
"http://localhost:21000/";
- private static final String HBASE_CLUSTER_NAME = "atlas.cluster.name";
- private static final String DEFAULT_CLUSTER_NAME = "primary";
- private static final String QUALIFIED_NAME = "qualifiedName";
- private static final String NAME = "name";
- private static final String URI = "uri";
- private static final String OWNER = "owner";
- private static final String DESCRIPTION_ATTR = "description";
- private static final String CLUSTERNAME = "clusterName";
- private static final String NAMESPACE = "namespace";
- private static final String TABLE = "table";
- private static final String COLUMN_FAMILIES = "column_families";
+ private static final int EXIT_CODE_SUCCESS = 0;
+ private static final int EXIT_CODE_FAILED = 1;
+ private static final String ATLAS_ENDPOINT =
"atlas.rest.address";
+ private static final String DEFAULT_ATLAS_URL =
"http://localhost:21000/";
+ private static final String CLUSTER_NAME_KEY =
"atlas.cluster.name";
+ private static final String DEFAULT_CLUSTER_NAME = "primary";
+ private static final String HBASE_METADATA_NAMESPACE =
"atlas.metadata.namespace";
+ private static final String QUALIFIED_NAME = "qualifiedName";
+ private static final String NAME = "name";
+ private static final String URI = "uri";
+ private static final String OWNER = "owner";
+ private static final String DESCRIPTION_ATTR = "description";
+ private static final String CLUSTERNAME = "clusterName";
+ private static final String NAMESPACE = "namespace";
+ private static final String TABLE = "table";
+ private static final String COLUMN_FAMILIES =
"column_families";
// table metadata
private static final String ATTR_TABLE_MAX_FILESIZE =
"maxFileSize";
@@ -115,9 +116,9 @@ public class HBaseBridge {
private static final String HBASE_TABLE_QUALIFIED_NAME_FORMAT =
"%s:%s@%s";
private static final String HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT =
"%s:%s.%s@%s";
- private final String clusterName;
- private final AtlasClientV2 atlasClientV2;
- private final Admin hbaseAdmin;
+ private final String metadataNamespace;
+ private final AtlasClientV2 atlasClientV2;
+ private final Admin hbaseAdmin;
public static void main(String[] args) {
@@ -204,8 +205,8 @@ public class HBaseBridge {
}
public HBaseBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2)
throws Exception {
- this.atlasClientV2 = atlasClientV2;
- this.clusterName = atlasConf.getString(HBASE_CLUSTER_NAME,
DEFAULT_CLUSTER_NAME);
+ this.atlasClientV2 = atlasClientV2;
+ this.metadataNamespace = getMetadataNamespace(atlasConf);
org.apache.hadoop.conf.Configuration conf =
HBaseConfiguration.create();
@@ -220,6 +221,14 @@ public class HBaseBridge {
hbaseAdmin = conn.getAdmin();
}
+ private String getMetadataNamespace(Configuration config) {
+ return config.getString(HBASE_METADATA_NAMESPACE,
getClusterName(config));
+ }
+
+ private String getClusterName(Configuration config) {
+ return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
+ }
+
private boolean importHBaseEntities(String namespaceToImport, String
tableToImport) throws Exception {
boolean ret = false;
@@ -367,7 +376,7 @@ public class HBaseBridge {
protected AtlasEntityWithExtInfo
createOrUpdateNameSpace(NamespaceDescriptor namespaceDescriptor) throws
Exception {
String nsName = namespaceDescriptor.getName();
- String nsQualifiedName =
getNameSpaceQualifiedName(clusterName, nsName);
+ String nsQualifiedName =
getNameSpaceQualifiedName(metadataNamespace, nsName);
AtlasEntityWithExtInfo nsEntity =
findNameSpaceEntityInAtlas(nsQualifiedName);
if (nsEntity == null) {
@@ -390,7 +399,7 @@ public class HBaseBridge {
protected AtlasEntityWithExtInfo createOrUpdateTable(String nameSpace,
String tableName, AtlasEntity nameSapceEntity, TableDescriptor htd,
ColumnFamilyDescriptor[] hcdts) throws Exception {
String owner = htd.getOwnerString();
- String tblQualifiedName =
getTableQualifiedName(clusterName, nameSpace, tableName);
+ String tblQualifiedName =
getTableQualifiedName(metadataNamespace, nameSpace, tableName);
AtlasEntityWithExtInfo ret =
findTableEntityInAtlas(tblQualifiedName);
if (ret == null) {
@@ -436,7 +445,7 @@ public class HBaseBridge {
for (ColumnFamilyDescriptor columnFamilyDescriptor : hcdts) {
String cfName =
columnFamilyDescriptor.getNameAsString();
- String cfQualifiedName =
getColumnFamilyQualifiedName(clusterName, nameSpace, tableName, cfName);
+ String cfQualifiedName =
getColumnFamilyQualifiedName(metadataNamespace, nameSpace, tableName, cfName);
AtlasEntityWithExtInfo cfEntity =
findColumnFamiltyEntityInAtlas(cfQualifiedName);
if (cfEntity == null) {
@@ -516,10 +525,10 @@ public class HBaseBridge {
ret = nsEtity;
}
- String qualifiedName = getNameSpaceQualifiedName(clusterName,
nameSpace);
+ String qualifiedName = getNameSpaceQualifiedName(metadataNamespace,
nameSpace);
ret.setAttribute(QUALIFIED_NAME, qualifiedName);
- ret.setAttribute(CLUSTERNAME, clusterName);
+ ret.setAttribute(CLUSTERNAME, metadataNamespace);
ret.setAttribute(NAME, nameSpace);
ret.setAttribute(DESCRIPTION_ATTR, nameSpace);
@@ -535,10 +544,10 @@ public class HBaseBridge {
ret = atlasEntity;
}
- String tableQualifiedName = getTableQualifiedName(clusterName,
nameSpace, tableName);
+ String tableQualifiedName = getTableQualifiedName(metadataNamespace,
nameSpace, tableName);
ret.setAttribute(QUALIFIED_NAME, tableQualifiedName);
- ret.setAttribute(CLUSTERNAME, clusterName);
+ ret.setAttribute(CLUSTERNAME, metadataNamespace);
ret.setAttribute(NAMESPACE,
AtlasTypeUtil.getAtlasObjectId(nameSpaceEntity));
ret.setAttribute(NAME, tableName);
ret.setAttribute(DESCRIPTION_ATTR, tableName);
@@ -564,10 +573,10 @@ public class HBaseBridge {
}
String cfName = hcdt.getNameAsString();
- String cfQualifiedName = getColumnFamilyQualifiedName(clusterName,
nameSpace, tableName, cfName);
+ String cfQualifiedName =
getColumnFamilyQualifiedName(metadataNamespace, nameSpace, tableName, cfName);
ret.setAttribute(QUALIFIED_NAME, cfQualifiedName);
- ret.setAttribute(CLUSTERNAME, clusterName);
+ ret.setAttribute(CLUSTERNAME, metadataNamespace);
ret.setAttribute(TABLE, tableId);
ret.setAttribute(NAME, cfName);
ret.setAttribute(DESCRIPTION_ATTR, cfName);
@@ -637,37 +646,37 @@ public class HBaseBridge {
/**
* Construct the qualified name used to uniquely identify a ColumnFamily
instance in Atlas.
- * @param clusterName Name of the cluster to which the Hbase component
belongs
+ * @param metadataNamespace Metadata namespace of the cluster to which the
Hbase component belongs
* @param nameSpace Name of the Hbase database to which the Table belongs
* @param tableName Name of the Hbase table
* @param columnFamily Name of the ColumnFamily
* @return Unique qualified name to identify the Table instance in Atlas.
*/
- private static String getColumnFamilyQualifiedName(String clusterName,
String nameSpace, String tableName, String columnFamily) {
+ private static String getColumnFamilyQualifiedName(String
metadataNamespace, String nameSpace, String tableName, String columnFamily) {
tableName = stripNameSpace(tableName.toLowerCase());
- return String.format(HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT,
nameSpace.toLowerCase(), tableName, columnFamily.toLowerCase(), clusterName);
+ return String.format(HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT,
nameSpace.toLowerCase(), tableName, columnFamily.toLowerCase(),
metadataNamespace);
}
/**
* Construct the qualified name used to uniquely identify a Table instance
in Atlas.
- * @param clusterName Name of the cluster to which the Hbase component
belongs
+ * @param metadataNamespace Metadata namespace of the cluster to which the
Hbase component belongs
* @param nameSpace Name of the Hbase database to which the Table belongs
* @param tableName Name of the Hbase table
* @return Unique qualified name to identify the Table instance in Atlas.
*/
- private static String getTableQualifiedName(String clusterName, String
nameSpace, String tableName) {
+ private static String getTableQualifiedName(String metadataNamespace,
String nameSpace, String tableName) {
tableName = stripNameSpace(tableName.toLowerCase());
- return String.format(HBASE_TABLE_QUALIFIED_NAME_FORMAT,
nameSpace.toLowerCase(), tableName, clusterName);
+ return String.format(HBASE_TABLE_QUALIFIED_NAME_FORMAT,
nameSpace.toLowerCase(), tableName, metadataNamespace);
}
/**
* Construct the qualified name used to uniquely identify a Hbase
NameSpace instance in Atlas.
- * @param clusterName Name of the cluster to which the Hbase component
belongs
+ * @param metadataNamespace Metadata namespace of the cluster to which the
Hbase component belongs
* @param nameSpace Name of the NameSpace
* @return Unique qualified name to identify the HBase NameSpace instance
in Atlas.
*/
- private static String getNameSpaceQualifiedName(String clusterName, String
nameSpace) {
- return String.format(HBASE_NAMESPACE_QUALIFIED_NAME,
nameSpace.toLowerCase(), clusterName);
+ private static String getNameSpaceQualifiedName(String metadataNamespace,
String nameSpace) {
+ return String.format(HBASE_NAMESPACE_QUALIFIED_NAME,
nameSpace.toLowerCase(), metadataNamespace);
}
private static String stripNameSpace(String tableName){
diff --git a/addons/hive-bridge/src/bin/import-hive.sh
b/addons/hive-bridge/src/bin/import-hive.sh
index 38ec3be..b1660ea 100755
--- a/addons/hive-bridge/src/bin/import-hive.sh
+++ b/addons/hive-bridge/src/bin/import-hive.sh
@@ -109,7 +109,7 @@ else
exit 1
fi
-CP="${ATLASCPPATH}:${HIVE_CP}:${HADOOP_CP}"
+CP="${HIVE_CP}:${HADOOP_CP}:${ATLASCPPATH}"
# If running in cygwin, convert pathnames and classpath to Windows format.
if [ "${CYGWIN}" == "true" ]
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 5f8f846..049112b 100755
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -83,7 +83,8 @@ public class HiveMetaStoreBridge {
private static final Logger LOG =
LoggerFactory.getLogger(HiveMetaStoreBridge.class);
public static final String CONF_PREFIX =
"atlas.hook.hive.";
- public static final String HIVE_CLUSTER_NAME =
"atlas.cluster.name";
+ public static final String CLUSTER_NAME_KEY =
"atlas.cluster.name";
+ public static final String HIVE_METADATA_NAMESPACE =
"atlas.metadata.namespace";
public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX +
"hdfs_path.convert_to_lowercase";
public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String TEMP_TABLE_PREFIX = "_temp-";
@@ -95,10 +96,10 @@ public class HiveMetaStoreBridge {
private static final int EXIT_CODE_FAILED = 1;
private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
- private final String clusterName;
- private final Hive hiveClient;
- private final AtlasClientV2 atlasClientV2;
- private final boolean convertHdfsPathToLowerCase;
+ private final String metadataNamespace;
+ private final Hive hiveClient;
+ private final AtlasClientV2 atlasClientV2;
+ private final boolean convertHdfsPathToLowerCase;
public static void main(String[] args) {
@@ -209,7 +210,10 @@ public class HiveMetaStoreBridge {
* @param hiveConf {@link HiveConf} for Hive component in the cluster
*/
public HiveMetaStoreBridge(Configuration atlasProperties, HiveConf
hiveConf, AtlasClientV2 atlasClientV2) throws Exception {
- this(atlasProperties.getString(HIVE_CLUSTER_NAME,
DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), atlasClientV2,
atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false));
+ this.metadataNamespace =
getMetadataNamespace(atlasProperties);
+ this.hiveClient = Hive.get(hiveConf);
+ this.atlasClientV2 = atlasClientV2;
+ this.convertHdfsPathToLowerCase =
atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
}
/**
@@ -220,19 +224,27 @@ public class HiveMetaStoreBridge {
this(atlasProperties, hiveConf, null);
}
- HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClientV2
atlasClientV2) {
- this(clusterName, hiveClient, atlasClientV2, true);
+ HiveMetaStoreBridge(String metadataNamespace, Hive hiveClient,
AtlasClientV2 atlasClientV2) {
+ this(metadataNamespace, hiveClient, atlasClientV2, true);
}
- HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClientV2
atlasClientV2, boolean convertHdfsPathToLowerCase) {
- this.clusterName = clusterName;
+ HiveMetaStoreBridge(String metadataNamespace, Hive hiveClient,
AtlasClientV2 atlasClientV2, boolean convertHdfsPathToLowerCase) {
+ this.metadataNamespace = metadataNamespace;
this.hiveClient = hiveClient;
this.atlasClientV2 = atlasClientV2;
this.convertHdfsPathToLowerCase = convertHdfsPathToLowerCase;
}
- public String getClusterName() {
- return clusterName;
+ public String getMetadataNamespace(Configuration config) {
+ return config.getString(HIVE_METADATA_NAMESPACE,
getClusterName(config));
+ }
+
+ private String getClusterName(Configuration config) {
+ return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
+ }
+
+ public String getMetadataNamespace() {
+ return metadataNamespace;
}
public Hive getHiveClient() {
@@ -337,7 +349,7 @@ public class HiveMetaStoreBridge {
AtlasEntityWithExtInfo tableEntity = registerTable(dbEntity,
table);
if (table.getTableType() == TableType.EXTERNAL_TABLE) {
- String processQualifiedName =
getTableProcessQualifiedName(clusterName, table);
+ String processQualifiedName =
getTableProcessQualifiedName(metadataNamespace, table);
AtlasEntityWithExtInfo processEntity =
findProcessEntity(processQualifiedName);
if (processEntity == null) {
@@ -350,7 +362,7 @@ public class HiveMetaStoreBridge {
processInst.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
processQualifiedName);
processInst.setAttribute(ATTRIBUTE_NAME, query);
- processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME,
clusterName);
+ processInst.setAttribute(ATTRIBUTE_CLUSTER_NAME,
metadataNamespace);
processInst.setAttribute(ATTRIBUTE_INPUTS,
Collections.singletonList(BaseHiveEvent.getObjectId(pathInst)));
processInst.setAttribute(ATTRIBUTE_OUTPUTS,
Collections.singletonList(BaseHiveEvent.getObjectId(tableInst)));
processInst.setAttribute(ATTRIBUTE_USER_NAME,
table.getOwner());
@@ -396,7 +408,7 @@ public class HiveMetaStoreBridge {
Database db = hiveClient.getDatabase(databaseName);
if (db != null) {
- ret = findDatabase(clusterName, databaseName);
+ ret = findDatabase(metadataNamespace, databaseName);
if (ret == null) {
ret = registerInstance(new
AtlasEntityWithExtInfo(toDbEntity(db)));
@@ -542,12 +554,12 @@ public class HiveMetaStoreBridge {
String dbName = hiveDB.getName().toLowerCase();
- dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
getDBQualifiedName(clusterName, dbName));
+ dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
getDBQualifiedName(metadataNamespace, dbName));
dbEntity.setAttribute(ATTRIBUTE_NAME, dbName);
dbEntity.setAttribute(ATTRIBUTE_DESCRIPTION, hiveDB.getDescription());
dbEntity.setAttribute(ATTRIBUTE_OWNER, hiveDB.getOwnerName());
- dbEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName);
+ dbEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
dbEntity.setAttribute(ATTRIBUTE_LOCATION,
HdfsNameServiceResolver.getPathWithNameServiceID(hiveDB.getLocationUri()));
dbEntity.setAttribute(ATTRIBUTE_PARAMETERS, hiveDB.getParameters());
@@ -574,7 +586,7 @@ public class HiveMetaStoreBridge {
}
AtlasEntity tableEntity = table.getEntity();
- String tableQualifiedName = getTableQualifiedName(clusterName,
hiveTable);
+ String tableQualifiedName =
getTableQualifiedName(metadataNamespace, hiveTable);
long createTime =
BaseHiveEvent.getTableCreateTime(hiveTable);
long lastAccessTime = hiveTable.getLastAccessTime() > 0 ?
hiveTable.getLastAccessTime() : createTime;
@@ -705,7 +717,7 @@ public class HiveMetaStoreBridge {
Path path = new Path(pathUri);
ret.setAttribute(ATTRIBUTE_NAME,
Path.getPathWithoutSchemeAndAuthority(path).toString());
- ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, clusterName);
+ ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
if (StringUtils.isNotEmpty(nameServiceID)) {
// Name service resolution is successful, now get updated HDFS
path where the host port info is replaced by resolved name service
@@ -717,7 +729,7 @@ public class HiveMetaStoreBridge {
} else {
ret.setAttribute(ATTRIBUTE_PATH, pathUri);
- // Only append clusterName for the HDFS path
+ // Only append metadataNamespace for the HDFS path
if (pathUri.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) {
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
getHdfsPathQualifiedName(pathUri));
} else {
@@ -731,18 +743,18 @@ public class HiveMetaStoreBridge {
/**
* Gets the atlas entity for the database
* @param databaseName database Name
- * @param clusterName cluster name
+ * @param metadataNamespace cluster name
* @return AtlasEntity for database if exists, else null
* @throws Exception
*/
- private AtlasEntityWithExtInfo findDatabase(String clusterName, String
databaseName) throws Exception {
+ private AtlasEntityWithExtInfo findDatabase(String metadataNamespace,
String databaseName) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Searching Atlas for database {}", databaseName);
}
String typeName = HiveDataTypes.HIVE_DB.getName();
- return findEntity(typeName, getDBQualifiedName(clusterName,
databaseName));
+ return findEntity(typeName, getDBQualifiedName(metadataNamespace,
databaseName));
}
/**
@@ -758,7 +770,7 @@ public class HiveMetaStoreBridge {
}
String typeName = HiveDataTypes.HIVE_TABLE.getName();
- String tblQualifiedName = getTableQualifiedName(getClusterName(),
hiveTable.getDbName(), hiveTable.getTableName());
+ String tblQualifiedName =
getTableQualifiedName(getMetadataNamespace(), hiveTable.getDbName(),
hiveTable.getTableName());
return findEntity(typeName, tblQualifiedName);
}
@@ -822,37 +834,37 @@ public class HiveMetaStoreBridge {
/**
* Construct the qualified name used to uniquely identify a Table instance
in Atlas.
- * @param clusterName Name of the cluster to which the Hive component
belongs
+ * @param metadataNamespace Metadata namespace of the cluster to which the
Hive component belongs
* @param table hive table for which the qualified name is needed
* @return Unique qualified name to identify the Table instance in Atlas.
*/
- private static String getTableQualifiedName(String clusterName, Table
table) {
- return getTableQualifiedName(clusterName, table.getDbName(),
table.getTableName(), table.isTemporary());
+ private static String getTableQualifiedName(String metadataNamespace,
Table table) {
+ return getTableQualifiedName(metadataNamespace, table.getDbName(),
table.getTableName(), table.isTemporary());
}
private String getHdfsPathQualifiedName(String hdfsPath) {
- return String.format("%s@%s", hdfsPath, clusterName);
+ return String.format("%s@%s", hdfsPath, metadataNamespace);
}
/**
* Construct the qualified name used to uniquely identify a Database
instance in Atlas.
- * @param clusterName Name of the cluster to which the Hive component
belongs
+ * @param metadataNamespace Name of the cluster to which the Hive
component belongs
* @param dbName Name of the Hive database
* @return Unique qualified name to identify the Database instance in
Atlas.
*/
- public static String getDBQualifiedName(String clusterName, String dbName)
{
- return String.format("%s@%s", dbName.toLowerCase(), clusterName);
+ public static String getDBQualifiedName(String metadataNamespace, String
dbName) {
+ return String.format("%s@%s", dbName.toLowerCase(), metadataNamespace);
}
/**
* Construct the qualified name used to uniquely identify a Table instance
in Atlas.
- * @param clusterName Name of the cluster to which the Hive component
belongs
+ * @param metadataNamespace Name of the cluster to which the Hive
component belongs
* @param dbName Name of the Hive database to which the Table belongs
* @param tableName Name of the Hive table
* @param isTemporaryTable is this a temporary table
* @return Unique qualified name to identify the Table instance in Atlas.
*/
- public static String getTableQualifiedName(String clusterName, String
dbName, String tableName, boolean isTemporaryTable) {
+ public static String getTableQualifiedName(String metadataNamespace,
String dbName, String tableName, boolean isTemporaryTable) {
String tableTempName = tableName;
if (isTemporaryTable) {
@@ -863,11 +875,11 @@ public class HiveMetaStoreBridge {
}
}
- return String.format("%s.%s@%s", dbName.toLowerCase(),
tableTempName.toLowerCase(), clusterName);
+ return String.format("%s.%s@%s", dbName.toLowerCase(),
tableTempName.toLowerCase(), metadataNamespace);
}
- public static String getTableProcessQualifiedName(String clusterName,
Table table) {
- String tableQualifiedName = getTableQualifiedName(clusterName, table);
+ public static String getTableProcessQualifiedName(String
metadataNamespace, Table table) {
+ String tableQualifiedName = getTableQualifiedName(metadataNamespace,
table);
long createdTime = getTableCreatedTime(table);
return tableQualifiedName + SEP + createdTime;
@@ -876,24 +888,24 @@ public class HiveMetaStoreBridge {
/**
* Construct the qualified name used to uniquely identify a Table instance
in Atlas.
- * @param clusterName Name of the cluster to which the Hive component
belongs
+ * @param metadataNamespace Metadata namespace of the cluster to which the
Hive component belongs
* @param dbName Name of the Hive database to which the Table belongs
* @param tableName Name of the Hive table
* @return Unique qualified name to identify the Table instance in Atlas.
*/
- public static String getTableQualifiedName(String clusterName, String
dbName, String tableName) {
- return getTableQualifiedName(clusterName, dbName, tableName, false);
+ public static String getTableQualifiedName(String metadataNamespace,
String dbName, String tableName) {
+ return getTableQualifiedName(metadataNamespace, dbName, tableName,
false);
}
public static String getStorageDescQFName(String tableQualifiedName) {
return tableQualifiedName + "_storage";
}
public static String getColumnQualifiedName(final String
tableQualifiedName, final String colName) {
- final String[] parts = tableQualifiedName.split("@");
- final String tableName = parts[0];
- final String clusterName = parts[1];
+ final String[] parts = tableQualifiedName.split("@");
+ final String tableName = parts[0];
+ final String metadataNamespace = parts[1];
- return String.format("%s.%s@%s", tableName, colName.toLowerCase(),
clusterName);
+ return String.format("%s.%s@%s", tableName, colName.toLowerCase(),
metadataNamespace);
}
public static long getTableCreatedTime(Table table) {
@@ -945,4 +957,4 @@ public class HiveMetaStoreBridge {
}
return ret;
}
-}
+}
\ No newline at end of file
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
index 76d6fe6..0eee7c1 100644
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
@@ -38,12 +38,12 @@ import static
org.apache.atlas.hive.hook.events.BaseHiveEvent.toTable;
public class AtlasHiveHookContext {
- public static final char QNAME_SEP_CLUSTER_NAME = '@';
- public static final char QNAME_SEP_ENTITY_NAME = '.';
- public static final char QNAME_SEP_PROCESS = ':';
- public static final String TEMP_TABLE_PREFIX = "_temp-";
- public static final String CREATE_OPERATION = "CREATE";
- public static final String ALTER_OPERATION = "ALTER";
+ public static final char QNAME_SEP_METADATA_NAMESPACE = '@';
+ public static final char QNAME_SEP_ENTITY_NAME = '.';
+ public static final char QNAME_SEP_PROCESS = ':';
+ public static final String TEMP_TABLE_PREFIX = "_temp-";
+ public static final String CREATE_OPERATION = "CREATE";
+ public static final String ALTER_OPERATION = "ALTER";
private final HiveHook hook;
private final HiveOperation hiveOperation;
@@ -157,8 +157,8 @@ public class AtlasHiveHookContext {
public Collection<AtlasEntity> getEntities() { return
qNameEntityMap.values(); }
- public String getClusterName() {
- return hook.getClusterName();
+ public String getMetadataNamespace() {
+ return hook.getMetadataNamespace();
}
public String getHostName() { return hook.getHostName(); }
@@ -192,7 +192,7 @@ public class AtlasHiveHookContext {
}
public String getQualifiedName(Database db) {
- return (db.getName() + QNAME_SEP_CLUSTER_NAME).toLowerCase() +
getClusterName();
+ return (db.getName() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() +
getMetadataNamespace();
}
public String getQualifiedName(Table table) {
@@ -206,7 +206,7 @@ public class AtlasHiveHookContext {
}
}
- return (table.getDbName() + QNAME_SEP_ENTITY_NAME + tableName +
QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+ return (table.getDbName() + QNAME_SEP_ENTITY_NAME + tableName +
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace();
}
public boolean isKnownDatabase(String dbQualifiedName) {
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index ffa56ce..5b1f61b 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -55,7 +55,6 @@ public class HiveHook extends AtlasHook implements
ExecuteWithHookContext {
public enum PreprocessAction { NONE, IGNORE, PRUNE }
public static final String CONF_PREFIX =
"atlas.hook.hive.";
- public static final String CONF_CLUSTER_NAME =
"atlas.cluster.name";
public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE =
CONF_PREFIX + "hdfs_path.convert_to_lowercase";
public static final String HOOK_NAME_CACHE_ENABLED =
CONF_PREFIX + "name.cache.enabled";
public static final String HOOK_NAME_CACHE_DATABASE_COUNT =
CONF_PREFIX + "name.cache.database.count";
@@ -66,13 +65,10 @@ public class HiveHook extends AtlasHook implements
ExecuteWithHookContext {
public static final String HOOK_HIVE_TABLE_IGNORE_PATTERN
= CONF_PREFIX + "hive_table.ignore.pattern";
public static final String HOOK_HIVE_TABLE_PRUNE_PATTERN
= CONF_PREFIX + "hive_table.prune.pattern";
public static final String HOOK_HIVE_TABLE_CACHE_SIZE
= CONF_PREFIX + "hive_table.cache.size";
-
- public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String DEFAULT_HOST_NAME = "localhost";
private static final Map<String, HiveOperation> OPERATION_MAP = new
HashMap<>();
- private static final String clusterName;
private static final boolean convertHdfsPathToLowerCase;
private static final boolean nameCacheEnabled;
private static final int nameCacheDatabaseMaxCount;
@@ -96,7 +92,6 @@ public class HiveHook extends AtlasHook implements
ExecuteWithHookContext {
OPERATION_MAP.put(hiveOperation.getOperationName(), hiveOperation);
}
- clusterName =
atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
convertHdfsPathToLowerCase =
atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
nameCacheEnabled =
atlasProperties.getBoolean(HOOK_NAME_CACHE_ENABLED, true);
nameCacheDatabaseMaxCount =
atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000);
@@ -253,10 +248,6 @@ public class HiveHook extends AtlasHook implements
ExecuteWithHookContext {
}
}
- public String getClusterName() {
- return clusterName;
- }
-
public boolean isConvertHdfsPathToLowerCase() {
return convertHdfsPathToLowerCase;
}
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index b3663da..98b4d4f 100644
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -62,7 +62,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static
org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_CLUSTER_NAME;
+import static
org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_METADATA_NAMESPACE;
import static
org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_ENTITY_NAME;
import static
org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_PROCESS;
@@ -350,7 +350,7 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_DESCRIPTION, db.getDescription());
ret.setAttribute(ATTRIBUTE_OWNER, db.getOwnerName());
- ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName());
+ ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getMetadataNamespace());
ret.setAttribute(ATTRIBUTE_LOCATION,
HdfsNameServiceResolver.getPathWithNameServiceID(db.getLocationUri()));
ret.setAttribute(ATTRIBUTE_PARAMETERS, db.getParameters());
@@ -596,7 +596,8 @@ public abstract class BaseHiveEvent {
protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo)
{
AtlasEntity ret;
- String strPath = path.toString();
+ String strPath = path.toString();
+ String metadataNamespace = getMetadataNamespace();
if (strPath.startsWith(HDFS_PATH_PREFIX) &&
context.isConvertHdfsPathToLowerCase()) {
strPath = strPath.toLowerCase();
@@ -604,8 +605,8 @@ public abstract class BaseHiveEvent {
if (isS3Path(strPath)) {
String bucketName = path.toUri().getAuthority();
- String bucketQualifiedName = (path.toUri().getScheme() +
SCHEME_SEPARATOR + path.toUri().getAuthority() +
QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
- String pathQualifiedName = (strPath +
QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+ String bucketQualifiedName = (path.toUri().getScheme() +
SCHEME_SEPARATOR + path.toUri().getAuthority() +
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
+ String pathQualifiedName = (strPath +
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
AtlasEntity bucketEntity =
context.getEntity(bucketQualifiedName);
ret = context.getEntity(pathQualifiedName);
@@ -654,7 +655,7 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_PATH, attrPath);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, name);
- ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName());
+ ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
context.putEntity(pathQualifiedName, ret);
}
@@ -751,8 +752,8 @@ public abstract class BaseHiveEvent {
return hiveDDL;
}
- protected String getClusterName() {
- return context.getClusterName();
+ protected String getMetadataNamespace() {
+ return context.getMetadataNamespace();
}
protected Database getDatabases(String dbName) throws Exception {
@@ -870,7 +871,7 @@ public abstract class BaseHiveEvent {
protected String getQualifiedName(Table table, FieldSchema column) {
String tblQualifiedName = getQualifiedName(table);
- int sepPos = tblQualifiedName.lastIndexOf(QNAME_SEP_CLUSTER_NAME);
+ int sepPos =
tblQualifiedName.lastIndexOf(QNAME_SEP_METADATA_NAMESPACE);
if (sepPos == -1) {
return tblQualifiedName + QNAME_SEP_ENTITY_NAME +
column.getName().toLowerCase();
@@ -888,19 +889,20 @@ public abstract class BaseHiveEvent {
}
protected String getQualifiedName(BaseColumnInfo column) {
- String dbName = column.getTabAlias().getTable().getDbName();
- String tableName = column.getTabAlias().getTable().getTableName();
- String colName = column.getColumn() != null ?
column.getColumn().getName() : null;
+ String dbName = column.getTabAlias().getTable().getDbName();
+ String tableName =
column.getTabAlias().getTable().getTableName();
+ String colName = column.getColumn() != null ?
column.getColumn().getName() : null;
+ String metadataNamespace = getMetadataNamespace();
if (colName == null) {
- return (dbName + QNAME_SEP_ENTITY_NAME + tableName +
QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName();
+ return (dbName + QNAME_SEP_ENTITY_NAME + tableName +
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
} else {
- return (dbName + QNAME_SEP_ENTITY_NAME + tableName +
QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_CLUSTER_NAME).toLowerCase() +
getClusterName();
+ return (dbName + QNAME_SEP_ENTITY_NAME + tableName +
QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() +
metadataNamespace;
}
}
protected String getQualifiedName(String dbName, String tableName, String
colName) {
- return (dbName + QNAME_SEP_ENTITY_NAME + tableName +
QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_CLUSTER_NAME).toLowerCase() +
getClusterName();
+ return (dbName + QNAME_SEP_ENTITY_NAME + tableName +
QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() +
getMetadataNamespace();
}
protected String getQualifiedName(URI location) {
@@ -918,14 +920,14 @@ public abstract class BaseHiveEvent {
protected String getQualifiedName(String path) {
if (path.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) {
- return path + QNAME_SEP_CLUSTER_NAME + getClusterName();
+ return path + QNAME_SEP_METADATA_NAMESPACE +
getMetadataNamespace();
}
return path.toLowerCase();
}
protected String getColumnQualifiedName(String tblQualifiedName, String
columnName) {
- int sepPos = tblQualifiedName.lastIndexOf(QNAME_SEP_CLUSTER_NAME);
+ int sepPos =
tblQualifiedName.lastIndexOf(QNAME_SEP_METADATA_NAMESPACE);
if (sepPos == -1) {
return tblQualifiedName + QNAME_SEP_ENTITY_NAME +
columnName.toLowerCase();
@@ -980,26 +982,27 @@ public abstract class BaseHiveEvent {
}
protected AtlasEntity toReferencedHBaseTable(Table table,
AtlasEntitiesWithExtInfo entities) {
- AtlasEntity ret = null;
- HBaseTableInfo hBaseTableInfo = new HBaseTableInfo(table);
- String hbaseNameSpace = hBaseTableInfo.getHbaseNameSpace();
- String hbaseTableName = hBaseTableInfo.getHbaseTableName();
+ AtlasEntity ret = null;
+ HBaseTableInfo hBaseTableInfo = new HBaseTableInfo(table);
+ String hbaseNameSpace = hBaseTableInfo.getHbaseNameSpace();
+ String hbaseTableName = hBaseTableInfo.getHbaseTableName();
+ String metadataNamespace = getMetadataNamespace();
if (hbaseTableName != null) {
AtlasEntity nsEntity = new AtlasEntity(HBASE_TYPE_NAMESPACE);
nsEntity.setAttribute(ATTRIBUTE_NAME, hbaseNameSpace);
- nsEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName());
- nsEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
getHBaseNameSpaceQualifiedName(getClusterName(), hbaseNameSpace));
+ nsEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
+ nsEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
getHBaseNameSpaceQualifiedName(metadataNamespace, hbaseNameSpace));
ret = new AtlasEntity(HBASE_TYPE_TABLE);
ret.setAttribute(ATTRIBUTE_NAME, hbaseTableName);
ret.setAttribute(ATTRIBUTE_URI, hbaseTableName);
- AtlasRelatedObjectId objIdRelatedObject = new
AtlasRelatedObjectId(getObjectId(nsEntity), RELATIONSHIP_HBASE_TABLE_NAMESPACE);
+ AtlasRelatedObjectId objIdRelatedObject = new
AtlasRelatedObjectId(getObjectId(nsEntity), RELATIONSHIP_HBASE_TABLE_NAMESPACE);
ret.setRelationshipAttribute(ATTRIBUTE_NAMESPACE,
objIdRelatedObject);
- ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
getHBaseTableQualifiedName(getClusterName(), hbaseNameSpace, hbaseTableName));
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
getHBaseTableQualifiedName(metadataNamespace, hbaseNameSpace, hbaseTableName));
entities.addReferredEntity(nsEntity);
entities.addEntity(ret);
@@ -1021,12 +1024,12 @@ public abstract class BaseHiveEvent {
return ret;
}
- private static String getHBaseTableQualifiedName(String clusterName,
String nameSpace, String tableName) {
- return String.format("%s:%s@%s", nameSpace.toLowerCase(),
tableName.toLowerCase(), clusterName);
+ private static String getHBaseTableQualifiedName(String metadataNamespace,
String nameSpace, String tableName) {
+ return String.format("%s:%s@%s", nameSpace.toLowerCase(),
tableName.toLowerCase(), metadataNamespace);
}
- private static String getHBaseNameSpaceQualifiedName(String clusterName,
String nameSpace) {
- return String.format("%s@%s", nameSpace.toLowerCase(), clusterName);
+ private static String getHBaseNameSpaceQualifiedName(String
metadataNamespace, String nameSpace) {
+ return String.format("%s@%s", nameSpace.toLowerCase(),
metadataNamespace);
}
private boolean ignoreHDFSPathsinProcessQualifiedName() {
diff --git
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java
index cbee7bf..0736153 100644
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java
@@ -519,7 +519,7 @@ public class HiveITBase {
Table outTable = entity.getTable();
//refresh table
outTable =
dgiBridge.getHiveClient().getTable(outTable.getDbName(),
outTable.getTableName());
- return
HiveMetaStoreBridge.getTableProcessQualifiedName(dgiBridge.getClusterName(),
outTable);
+ return
HiveMetaStoreBridge.getTableProcessQualifiedName(dgiBridge.getMetadataNamespace(),
outTable);
}
}
diff --git
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
index d55aa53..4403aaf 100644
---
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
+++
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
@@ -53,10 +53,9 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class HiveMetaStoreBridgeTest {
-
- private static final String TEST_DB_NAME = "default";
- public static final String CLUSTER_NAME = "primary";
- public static final String TEST_TABLE_NAME = "test_table";
+ private static final String TEST_DB_NAME = "default";
+ public static final String METADATA_NAMESPACE = "primary";
+ public static final String TEST_TABLE_NAME = "test_table";
@Mock
private Hive hiveClient;
@@ -90,13 +89,13 @@ public class HiveMetaStoreBridgeTest {
when(hiveClient.getDatabase(TEST_DB_NAME)).thenReturn(db);
when(hiveClient.getAllTables(TEST_DB_NAME)).thenReturn(Arrays.asList(new
String[]{}));
- returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME);
+ returnExistingDatabase(TEST_DB_NAME, atlasClientV2,
METADATA_NAMESPACE);
when(atlasEntityWithExtInfo.getEntity("72e06b34-9151-4023-aa9d-b82103a50e76"))
.thenReturn((new AtlasEntity.AtlasEntityWithExtInfo(
getEntity(HiveDataTypes.HIVE_DB.getName(),
AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76"))).getEntity());
- HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME,
hiveClient, atlasClientV2);
+ HiveMetaStoreBridge bridge = new
HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2);
bridge.importHiveMetadata(null, null, true);
// verify update is called
@@ -109,7 +108,7 @@ public class HiveMetaStoreBridgeTest {
List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME,
TEST_TABLE_NAME);
- returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME);
+ returnExistingDatabase(TEST_DB_NAME, atlasClientV2,
METADATA_NAMESPACE);
// return existing table
@@ -119,7 +118,7 @@ public class HiveMetaStoreBridgeTest {
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
- HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME,
TEST_DB_NAME, TEST_TABLE_NAME))))
+ HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE,
TEST_DB_NAME, TEST_TABLE_NAME))))
.thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
getEntity(HiveDataTypes.HIVE_TABLE.getName(),
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
@@ -127,7 +126,7 @@ public class HiveMetaStoreBridgeTest {
.thenReturn(createTableReference());
Table testTable = hiveTables.get(0);
- String processQualifiedName =
HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable);
+ String processQualifiedName =
HiveMetaStoreBridge.getTableProcessQualifiedName(METADATA_NAMESPACE, testTable);
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
@@ -136,7 +135,7 @@ public class HiveMetaStoreBridgeTest {
getEntity(HiveDataTypes.HIVE_PROCESS.getName(),
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
- HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME,
hiveClient, atlasClientV2);
+ HiveMetaStoreBridge bridge = new
HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2);
bridge.importHiveMetadata(null, null, true);
// verify update is called on table
@@ -144,13 +143,13 @@ public class HiveMetaStoreBridgeTest {
}
- private void returnExistingDatabase(String databaseName, AtlasClientV2
atlasClientV2, String clusterName)
+ private void returnExistingDatabase(String databaseName, AtlasClientV2
atlasClientV2, String metadataNamespace)
throws AtlasServiceException {
//getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID,
"72e06b34-9151-4023-aa9d-b82103a50e76");
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_DB.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
- HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME,
TEST_DB_NAME))))
+
HiveMetaStoreBridge.getDBQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME))))
.thenReturn((new AtlasEntity.AtlasEntityWithExtInfo(
getEntity(HiveDataTypes.HIVE_DB.getName(),
AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76"))));
@@ -179,16 +178,16 @@ public class HiveMetaStoreBridgeTest {
List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME,
TEST_TABLE_NAME);
Table hiveTable = hiveTables.get(0);
- returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME);
+ returnExistingDatabase(TEST_DB_NAME, atlasClientV2,
METADATA_NAMESPACE);
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME,
TEST_TABLE_NAME))))
+
HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME,
TEST_TABLE_NAME))))
.thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
getEntity(HiveDataTypes.HIVE_TABLE.getName(),
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
- String processQualifiedName =
HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTable);
+ String processQualifiedName =
HiveMetaStoreBridge.getTableProcessQualifiedName(METADATA_NAMESPACE, hiveTable);
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
@@ -206,7 +205,7 @@ public class HiveMetaStoreBridgeTest {
when(hiveClient.getPartitions(hiveTable)).thenReturn(Arrays.asList(new
Partition[]{partition}));
- HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME,
hiveClient, atlasClientV2);
+ HiveMetaStoreBridge bridge = new
HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2);
try {
bridge.importHiveMetadata(null, null, true);
} catch (Exception e) {
@@ -220,12 +219,12 @@ public class HiveMetaStoreBridgeTest {
final String table2Name = TEST_TABLE_NAME + "_1";
List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME,
TEST_TABLE_NAME, table2Name);
- returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME);
+ returnExistingDatabase(TEST_DB_NAME, atlasClientV2,
METADATA_NAMESPACE);
when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new
RuntimeException("Timeout while reading data from hive metastore"));
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, TEST_DB_NAME,
TEST_TABLE_NAME))))
+
HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE, TEST_DB_NAME,
TEST_TABLE_NAME))))
.thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
getEntity(HiveDataTypes.HIVE_TABLE.getName(),
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
@@ -233,7 +232,7 @@ public class HiveMetaStoreBridgeTest {
.thenReturn(createTableReference());
Table testTable = hiveTables.get(1);
- String processQualifiedName =
HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable);
+ String processQualifiedName =
HiveMetaStoreBridge.getTableProcessQualifiedName(METADATA_NAMESPACE, testTable);
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
@@ -241,7 +240,7 @@ public class HiveMetaStoreBridgeTest {
.thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
getEntity(HiveDataTypes.HIVE_PROCESS.getName(),
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
- HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME,
hiveClient, atlasClientV2);
+ HiveMetaStoreBridge bridge = new
HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2);
try {
bridge.importHiveMetadata(null, null, false);
} catch (Exception e) {
@@ -255,13 +254,13 @@ public class HiveMetaStoreBridgeTest {
final String table2Name = TEST_TABLE_NAME + "_1";
List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME,
TEST_TABLE_NAME, table2Name);
- returnExistingDatabase(TEST_DB_NAME, atlasClientV2, CLUSTER_NAME);
+ returnExistingDatabase(TEST_DB_NAME, atlasClientV2,
METADATA_NAMESPACE);
when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new
RuntimeException("Timeout while reading data from hive metastore"));
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_TABLE.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
- HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME,
TEST_DB_NAME, TEST_TABLE_NAME))))
+ HiveMetaStoreBridge.getTableQualifiedName(METADATA_NAMESPACE,
TEST_DB_NAME, TEST_TABLE_NAME))))
.thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
getEntity(HiveDataTypes.HIVE_TABLE.getName(),
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
@@ -270,7 +269,7 @@ public class HiveMetaStoreBridgeTest {
.thenReturn(createTableReference());
Table testTable = hiveTables.get(1);
- String processQualifiedName =
HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable);
+ String processQualifiedName =
HiveMetaStoreBridge.getTableProcessQualifiedName(METADATA_NAMESPACE, testTable);
when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(),
Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
@@ -278,7 +277,7 @@ public class HiveMetaStoreBridgeTest {
.thenReturn(new AtlasEntity.AtlasEntityWithExtInfo(
getEntity(HiveDataTypes.HIVE_PROCESS.getName(),
AtlasClient.GUID, "82e06b34-9151-4023-aa9d-b82103a50e77")));
- HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME,
hiveClient, atlasClientV2);
+ HiveMetaStoreBridge bridge = new
HiveMetaStoreBridge(METADATA_NAMESPACE, hiveClient, atlasClientV2);
try {
bridge.importHiveMetadata(null, null, true);
Assert.fail("Table registration is supposed to fail");
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
index 1305f65..51b2f83 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
@@ -33,9 +33,9 @@ import org.apache.commons.lang.StringUtils;
* Contain the info related to an linear record from Impala
*/
public class AtlasImpalaHookContext {
- public static final char QNAME_SEP_CLUSTER_NAME = '@';
- public static final char QNAME_SEP_ENTITY_NAME = '.';
- public static final char QNAME_SEP_PROCESS = ':';
+ public static final char QNAME_SEP_METADATA_NAMESPACE = '@';
+ public static final char QNAME_SEP_ENTITY_NAME = '.';
+ public static final char QNAME_SEP_PROCESS = ':';
private final ImpalaLineageHook hook;
private final ImpalaOperationType impalaOperation;
@@ -69,8 +69,8 @@ public class AtlasImpalaHookContext {
public Collection<AtlasEntity> getEntities() { return
qNameEntityMap.values(); }
- public String getClusterName() {
- return hook.getClusterName();
+ public String getMetadataNamespace() {
+ return hook.getMetadataNamespace();
}
public String getHostName() {
@@ -82,7 +82,7 @@ public class AtlasImpalaHookContext {
}
public String getQualifiedNameForDb(String dbName) {
- return (dbName + QNAME_SEP_CLUSTER_NAME).toLowerCase() +
getClusterName();
+ return (dbName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() +
getMetadataNamespace();
}
public String getQualifiedNameForTable(String fullTableName) throws
IllegalArgumentException {
@@ -100,8 +100,7 @@ public class AtlasImpalaHookContext {
}
public String getQualifiedNameForTable(String dbName, String tableName) {
- return (dbName + QNAME_SEP_ENTITY_NAME + tableName +
QNAME_SEP_CLUSTER_NAME).toLowerCase() +
- getClusterName();
+ return (dbName + QNAME_SEP_ENTITY_NAME + tableName +
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace();
}
public String getQualifiedNameForColumn(LineageVertex vertex) {
@@ -179,7 +178,7 @@ public class AtlasImpalaHookContext {
public String getQualifiedNameForColumn(String dbName, String tableName,
String columnName) {
return
(dbName + QNAME_SEP_ENTITY_NAME + tableName +
QNAME_SEP_ENTITY_NAME +
- columnName + QNAME_SEP_CLUSTER_NAME).toLowerCase() +
getClusterName();
+ columnName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() +
getMetadataNamespace();
}
public String getUserName() { return lineageQuery.getUser(); }
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
index b5fdb6d..10ae08f 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
@@ -18,7 +18,6 @@
package org.apache.atlas.impala.hook;
-import static org.apache.atlas.AtlasConstants.DEFAULT_CLUSTER_NAME;
import java.net.InetAddress;
import java.net.UnknownHostException;
import com.google.common.collect.Sets;
@@ -42,20 +41,17 @@ public class ImpalaLineageHook extends AtlasHook {
public static final String ATLAS_ENDPOINT =
"atlas.rest.address";
public static final String REALM_SEPARATOR = "@";
public static final String CONF_PREFIX =
"atlas.hook.impala.";
- public static final String CONF_CLUSTER_NAME =
"atlas.cluster.name";
public static final String CONF_REALM_NAME =
"atlas.realm.name";
public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE =
CONF_PREFIX + "hdfs_path.convert_to_lowercase";
public static final String DEFAULT_HOST_NAME =
"localhost";
- private static final String clusterName;
- private static final String realm;
+ private static final String realm;
private static final boolean convertHdfsPathToLowerCase;
- private static String hostName;
+ private static String hostName;
static {
- clusterName =
atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
- realm =
atlasProperties.getString(CONF_REALM_NAME, DEFAULT_CLUSTER_NAME); // what
should default be ??
- convertHdfsPathToLowerCase =
atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
+ realm =
atlasProperties.getString(CONF_REALM_NAME, DEFAULT_CLUSTER_NAME); // what
should default be ??
+ convertHdfsPathToLowerCase =
atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
try {
hostName = InetAddress.getLocalHost().getHostName();
@@ -143,10 +139,6 @@ public class ImpalaLineageHook extends AtlasHook {
return UserGroupInformation.getUGIFromSubject(userSubject);
}
- public String getClusterName() {
- return clusterName;
- }
-
public String getRealm() {
return realm;
}
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
index 4ea484f..5329e8b 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
@@ -340,7 +340,7 @@ public abstract class BaseImpalaEvent {
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, dbName.toLowerCase());
- ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, context.getClusterName());
+ ret.setAttribute(ATTRIBUTE_CLUSTER_NAME,
context.getMetadataNamespace());
context.putEntity(dbQualifiedName, ret);
}
diff --git
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
index f1d0237..600ca5e 100644
---
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
+++
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageITBase.java
@@ -293,7 +293,7 @@ public class ImpalaLineageITBase {
protected String assertDatabaseIsRegistered(String dbName, AssertPredicate
assertPredicate) throws Exception {
LOG.debug("Searching for database: {}", dbName);
- String dbQualifiedName = dbName +
AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ String dbQualifiedName = dbName +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME;
dbQualifiedName = dbQualifiedName.toLowerCase();
@@ -320,7 +320,7 @@ public class ImpalaLineageITBase {
protected String assertTableIsRegistered(String fullTableName,
AssertPredicate assertPredicate, boolean isTemporary) throws Exception {
LOG.debug("Searching for table {}", fullTableName);
- String tableQualifiedName = (fullTableName +
AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME).toLowerCase() +
+ String tableQualifiedName = (fullTableName +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() +
CLUSTER_NAME;
return assertEntityIsRegistered(HIVE_TYPE_TABLE,
REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName,
diff --git
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
index 8ebb385..25e26e8 100644
---
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
+++
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolIT.java
@@ -77,7 +77,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
// the value is from info in IMPALA_3
String createTime = new Long((long)(1554750072)*1000).toString();
String processQFName =
- "db_1.view_1" + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ "db_1.view_1" +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS +
createTime;
processQFName = processQFName.toLowerCase();
@@ -140,7 +140,7 @@ public class ImpalaLineageToolIT extends
ImpalaLineageITBase {
Long afterCreateTime = System.currentTimeMillis() /
BaseImpalaEvent.MILLIS_CONVERT_FACTOR;
String processQFNameWithoutTime =
- dbName + "." + targetTableName +
AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ dbName + "." + targetTableName +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS;
processQFNameWithoutTime = processQFNameWithoutTime.toLowerCase();
@@ -210,7 +210,7 @@ public class ImpalaLineageToolIT extends
ImpalaLineageITBase {
// the value is from info in IMPALA_4.
String createTime = new Long(TABLE_CREATE_TIME*1000).toString();
String processQFName =
- dbName + "." + targetTableName +
AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ dbName + "." + targetTableName +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS +
createTime;
processQFName = processQFName.toLowerCase();
@@ -266,7 +266,7 @@ public class ImpalaLineageToolIT extends
ImpalaLineageITBase {
// the value is from info in IMPALA_4.
String createTime = new Long(TABLE_CREATE_TIME*1000).toString();
String processQFName =
- dbName + "." + targetTableName +
AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ dbName + "." + targetTableName +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS +
createTime;
processQFName = processQFName.toLowerCase();
@@ -322,9 +322,9 @@ public class ImpalaLineageToolIT extends
ImpalaLineageITBase {
// the value is from info in IMPALA_4.
String createTime1 = new
Long(TABLE_CREATE_TIME_SOURCE*1000).toString();
String createTime2 = new Long(TABLE_CREATE_TIME*1000).toString();
- String sourceQFName = dbName + "." + sourceTableName +
AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ String sourceQFName = dbName + "." + sourceTableName +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS +
createTime1;
- String targetQFName = dbName + "." + targetTableName +
AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ String targetQFName = dbName + "." + targetTableName +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS +
createTime2;
String processQFName = "QUERY:" + sourceQFName.toLowerCase() +
"->:INSERT:" + targetQFName.toLowerCase();
@@ -385,9 +385,9 @@ public class ImpalaLineageToolIT extends
ImpalaLineageITBase {
// the value is from info in IMPALA_4.
String createTime1 = new
Long(TABLE_CREATE_TIME_SOURCE*1000).toString();
String createTime2 = new Long(TABLE_CREATE_TIME*1000).toString();
- String sourceQFName = dbName + "." + sourceTableName +
AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ String sourceQFName = dbName + "." + sourceTableName +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS +
createTime1;
- String targetQFName = dbName + "." + targetTableName +
AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ String targetQFName = dbName + "." + targetTableName +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS +
createTime2;
String processQFName = "QUERY:" + sourceQFName.toLowerCase() +
"->:INSERT:" + targetQFName.toLowerCase();
@@ -454,7 +454,7 @@ public class ImpalaLineageToolIT extends
ImpalaLineageITBase {
// the value is from info in IMPALA_4.
String createTime = new Long((long)1560885039*1000).toString();
String processQFName =
- dbName + "." + targetTableName +
AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ dbName + "." + targetTableName +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS +
createTime;
processQFName = processQFName.toLowerCase();
diff --git
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
index a7b9b0c..56d74fe 100644
---
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
+++
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookIT.java
@@ -137,7 +137,7 @@ public class ImpalaLineageHookIT extends
ImpalaLineageITBase {
impalaHook.process(queryObj);
String createTime = new
Long(BaseImpalaEvent.getTableCreateTime(vertex5)).toString();
String processQFName =
- vertex5.getVertexId() +
AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME +
+ vertex5.getVertexId() +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS +
createTime;
processQFName = processQFName.toLowerCase();
diff --git a/addons/kafka-bridge/src/bin/import-kafka.sh
b/addons/kafka-bridge/src/bin/import-kafka.sh
index 4bfb8b9..076eae8 100644
--- a/addons/kafka-bridge/src/bin/import-kafka.sh
+++ b/addons/kafka-bridge/src/bin/import-kafka.sh
@@ -117,7 +117,7 @@ else
exit 1
fi
-CP="${KAFKA_CP}:${ATLASCPPATH}:${HADOOP_CP}"
+CP="${ATLASCPPATH}:${HADOOP_CP}:${KAFKA_CP}"
# If running in cygwin, convert pathnames and classpath to Windows format.
if [ "${CYGWIN}" == "true" ]
diff --git
a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
index 8755c9e..40b1fee 100644
---
a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
+++
b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
@@ -58,19 +58,20 @@ import java.util.regex.Pattern;
public class KafkaBridge {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaBridge.class);
- private static final int EXIT_CODE_SUCCESS = 0;
- private static final int EXIT_CODE_FAILED = 1;
- private static final String ATLAS_ENDPOINT =
"atlas.rest.address";
- private static final String DEFAULT_ATLAS_URL =
"http://localhost:21000/";
- private static final String KAFKA_CLUSTER_NAME =
"atlas.cluster.name";
- private static final String DEFAULT_CLUSTER_NAME = "primary";
- private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
- private static final String DESCRIPTION_ATTR = "description";
- private static final String PARTITION_COUNT = "partitionCount";
- private static final String NAME = "name";
- private static final String URI = "uri";
- private static final String CLUSTERNAME = "clusterName";
- private static final String TOPIC = "topic";
+ private static final int EXIT_CODE_SUCCESS = 0;
+ private static final int EXIT_CODE_FAILED = 1;
+ private static final String ATLAS_ENDPOINT =
"atlas.rest.address";
+ private static final String DEFAULT_ATLAS_URL =
"http://localhost:21000/";
+ private static final String CLUSTER_NAME_KEY =
"atlas.cluster.name";
+ private static final String KAFKA_METADATA_NAMESPACE =
"atlas.metadata.namespace";
+ private static final String DEFAULT_CLUSTER_NAME = "primary";
+ private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+ private static final String DESCRIPTION_ATTR = "description";
+ private static final String PARTITION_COUNT = "partitionCount";
+ private static final String NAME = "name";
+ private static final String URI = "uri";
+ private static final String CLUSTERNAME = "clusterName";
+ private static final String TOPIC = "topic";
private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME =
"%s@%s";
private static final String ZOOKEEPER_CONNECT =
"atlas.kafka.zookeeper.connect";
@@ -81,7 +82,7 @@ public class KafkaBridge {
private static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10 *
1000;
private final List<String> availableTopics;
- private final String clusterName;
+ private final String metadataNamespace;
private final AtlasClientV2 atlasClientV2;
private final ZkUtils zkUtils;
@@ -163,10 +164,18 @@ public class KafkaBridge {
int connectionTimeOutMs =
atlasConf.getInt(ZOOKEEPER_CONNECTION_TIMEOUT_MS,
DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS);
ZkClient zkClient = new ZkClient(zookeeperConnect,
sessionTimeOutMs, connectionTimeOutMs, ZKStringSerializer$.MODULE$);
- this.atlasClientV2 = atlasClientV2;
- this.clusterName = atlasConf.getString(KAFKA_CLUSTER_NAME,
DEFAULT_CLUSTER_NAME);
- this.zkUtils = new ZkUtils(zkClient, new
ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled());
- this.availableTopics =
scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
+ this.atlasClientV2 = atlasClientV2;
+ this.metadataNamespace = getMetadataNamespace(atlasConf);
+ this.zkUtils = new ZkUtils(zkClient, new
ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled());
+ this.availableTopics =
scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
+ }
+
+ private String getMetadataNamespace(Configuration config) {
+ return config.getString(KAFKA_METADATA_NAMESPACE,
getClusterName(config));
+ }
+
+ private String getClusterName(Configuration config) {
+ return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
}
public void importTopic(String topicToImport) throws Exception {
@@ -191,7 +200,7 @@ public class KafkaBridge {
@VisibleForTesting
AtlasEntityWithExtInfo createOrUpdateTopic(String topic) throws Exception {
- String topicQualifiedName =
getTopicQualifiedName(clusterName, topic);
+ String topicQualifiedName =
getTopicQualifiedName(metadataNamespace, topic);
AtlasEntityWithExtInfo topicEntity =
findTopicEntityInAtlas(topicQualifiedName);
if (topicEntity == null) {
@@ -225,10 +234,10 @@ public class KafkaBridge {
ret = topicEntity;
}
- String qualifiedName = getTopicQualifiedName(clusterName, topic);
+ String qualifiedName = getTopicQualifiedName(metadataNamespace, topic);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
- ret.setAttribute(CLUSTERNAME, clusterName);
+ ret.setAttribute(CLUSTERNAME, metadataNamespace);
ret.setAttribute(TOPIC, topic);
ret.setAttribute(NAME,topic);
ret.setAttribute(DESCRIPTION_ATTR, topic);
@@ -239,8 +248,8 @@ public class KafkaBridge {
}
@VisibleForTesting
- static String getTopicQualifiedName(String clusterName, String topic) {
- return String.format(FORMAT_KAKFA_TOPIC_QUALIFIED_NAME,
topic.toLowerCase(), clusterName);
+ static String getTopicQualifiedName(String metadataNamespace, String
topic) {
+ return String.format(FORMAT_KAKFA_TOPIC_QUALIFIED_NAME,
topic.toLowerCase(), metadataNamespace);
}
private AtlasEntityWithExtInfo findTopicEntityInAtlas(String
topicQualifiedName) {
diff --git
a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index 5397a4b..3ccd426 100644
---
a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++
b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -52,8 +52,9 @@ import java.util.Date;
public class SqoopHook extends SqoopJobDataPublisher {
private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class);
- public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name";
- public static final String DEFAULT_CLUSTER_NAME = "primary";
+ public static final String CLUSTER_NAME_KEY =
"atlas.cluster.name";
+ public static final String ATLAS_METADATA_NAMESPACE =
"atlas.metadata.namespace";
+ public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String USER = "userName";
public static final String DB_STORE_TYPE = "dbStoreType";
@@ -80,12 +81,14 @@ public class SqoopHook extends SqoopJobDataPublisher {
@Override
public void publish(SqoopJobDataPublisher.Data data) throws
AtlasHookException {
try {
- Configuration atlasProperties = ApplicationProperties.get();
- String clusterName =
atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
- AtlasEntity entDbStore = toSqoopDBStoreEntity(data);
- AtlasEntity entHiveDb = toHiveDatabaseEntity(clusterName,
data.getHiveDB());
- AtlasEntity entHiveTable = data.getHiveTable() != null ?
toHiveTableEntity(entHiveDb, data.getHiveTable()) : null;
- AtlasEntity entProcess = toSqoopProcessEntity(entDbStore,
entHiveDb, entHiveTable, data, clusterName);
+ Configuration atlasProperties = ApplicationProperties.get();
+ String metadataNamespace =
atlasProperties.getString(ATLAS_METADATA_NAMESPACE,
getClusterName(atlasProperties));
+
+ AtlasEntity entDbStore = toSqoopDBStoreEntity(data);
+ AtlasEntity entHiveDb = toHiveDatabaseEntity(metadataNamespace,
data.getHiveDB());
+ AtlasEntity entHiveTable = data.getHiveTable() != null ?
toHiveTableEntity(entHiveDb, data.getHiveTable()) : null;
+ AtlasEntity entProcess = toSqoopProcessEntity(entDbStore,
entHiveDb, entHiveTable, data, metadataNamespace);
+
AtlasEntitiesWithExtInfo entities = new
AtlasEntitiesWithExtInfo(entProcess);
@@ -105,11 +108,15 @@ public class SqoopHook extends SqoopJobDataPublisher {
}
}
- private AtlasEntity toHiveDatabaseEntity(String clusterName, String
dbName) {
+ private String getClusterName(Configuration config) {
+ return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
+ }
+
+ private AtlasEntity toHiveDatabaseEntity(String metadataNamespace, String
dbName) {
AtlasEntity entHiveDb = new
AtlasEntity(HiveDataTypes.HIVE_DB.getName());
- String qualifiedName =
HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName);
+ String qualifiedName =
HiveMetaStoreBridge.getDBQualifiedName(metadataNamespace, dbName);
- entHiveDb.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE,
clusterName);
+ entHiveDb.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE,
metadataNamespace);
entHiveDb.setAttribute(AtlasClient.NAME, dbName);
entHiveDb.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
qualifiedName);
@@ -153,9 +160,10 @@ public class SqoopHook extends SqoopJobDataPublisher {
return entDbStore;
}
- private AtlasEntity toSqoopProcessEntity(AtlasEntity entDbStore,
AtlasEntity entHiveDb, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data
data, String clusterName) {
+ private AtlasEntity toSqoopProcessEntity(AtlasEntity entDbStore,
AtlasEntity entHiveDb, AtlasEntity entHiveTable,
+ SqoopJobDataPublisher.Data data,
String metadataNamespace) {
AtlasEntity entProcess = new
AtlasEntity(SqoopDataTypes.SQOOP_PROCESS.getName());
- String sqoopProcessName = getSqoopProcessName(data,
clusterName);
+ String sqoopProcessName = getSqoopProcessName(data,
metadataNamespace);
Map<String, String> sqoopOptionsMap = new HashMap<>();
Properties options = data.getOptions();
@@ -190,7 +198,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
return data.getOperation().toLowerCase().equals("import");
}
- static String getSqoopProcessName(Data data, String clusterName) {
+ static String getSqoopProcessName(Data data, String metadataNamespace) {
StringBuilder name = new StringBuilder(String.format("sqoop %s
--connect %s", data.getOperation(), data.getUrl()));
if (StringUtils.isNotEmpty(data.getHiveTable())) {
@@ -204,9 +212,9 @@ public class SqoopHook extends SqoopJobDataPublisher {
}
if (data.getHiveTable() != null) {
- name.append(String.format(" --hive-%s --hive-database %s
--hive-table %s --hive-cluster %s", data.getOperation(),
data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(),
clusterName));
+ name.append(String.format(" --hive-%s --hive-database %s
--hive-table %s --hive-cluster %s", data.getOperation(),
data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(),
metadataNamespace));
} else {
- name.append(String.format("--hive-%s --hive-database %s
--hive-cluster %s", data.getOperation(), data.getHiveDB(), clusterName));
+ name.append(String.format("--hive-%s --hive-database %s
--hive-cluster %s", data.getOperation(), data.getHiveDB(), metadataNamespace));
}
return name.toString();
diff --git
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index 97668a3..517a3c3 100644
---
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -118,7 +118,7 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
topology.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
topologyInfo.get_name());
topology.setAttribute(AtlasClient.OWNER, owner);
topology.setAttribute("startTime", new
Date(System.currentTimeMillis()));
- topology.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE,
getClusterName(stormConf));
+ topology.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE,
getMetadataNamespace());
return topology;
}
@@ -166,9 +166,9 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
}
private AtlasEntity addDataSet(String dataSetType, String topologyOwner,
Serializable instance, Map stormConf, AtlasEntityExtInfo entityExtInfo) {
- Map<String, String> config =
StormTopologyUtil.getFieldValues(instance, true, null);
- String clusterName = null;
- AtlasEntity ret = null;
+ Map<String, String> config =
StormTopologyUtil.getFieldValues(instance, true, null);
+ AtlasEntity ret = null;
+ String metadataNamespace = getMetadataNamespace();
// todo: need to redo this with a config driven approach
switch (dataSetType) {
@@ -188,8 +188,6 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
topologyOwner = ANONYMOUS_OWNER;
}
- clusterName = getClusterName(stormConf);
-
if (topicName == null) {
LOG.error("Kafka topic name not found");
} else {
@@ -198,7 +196,7 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
ret.setAttribute("topic", topicName);
ret.setAttribute("uri", uri);
ret.setAttribute(AtlasClient.OWNER, topologyOwner);
- ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getKafkaTopicQualifiedName(clusterName, topicName));
+ ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getKafkaTopicQualifiedName(metadataNamespace, topicName));
ret.setAttribute(AtlasClient.NAME, topicName);
}
}
@@ -212,7 +210,7 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
uri = hbaseTableName;
}
- clusterName =
extractComponentClusterName(HBaseConfiguration.create(), stormConf);
+ metadataNamespace =
extractComponentMetadataNamespace(HBaseConfiguration.create(), stormConf);
if (hbaseTableName == null) {
LOG.error("HBase table name not found");
@@ -223,7 +221,7 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
ret.setAttribute(AtlasClient.NAME, uri);
ret.setAttribute(AtlasClient.OWNER,
stormConf.get("storm.kerberos.principal"));
//TODO - Hbase Namespace is hardcoded to 'default'. need
to check how to get this or is it already part of tableName
- ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT,
hbaseTableName));
+ ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getHbaseTableQualifiedName(metadataNamespace, HBASE_NAMESPACE_DEFAULT,
hbaseTableName));
}
}
break;
@@ -234,11 +232,9 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
final Path hdfsPath = new Path(hdfsPathStr);
final String nameServiceID =
HdfsNameServiceResolver.getNameServiceIDForPath(hdfsPathStr);
- clusterName = getClusterName(stormConf);
-
ret = new AtlasEntity(HiveMetaStoreBridge.HDFS_PATH);
- ret.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE,
getClusterName(stormConf));
+ ret.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE,
metadataNamespace);
ret.setAttribute(AtlasClient.OWNER,
stormConf.get("hdfs.kerberos.principal"));
ret.setAttribute(AtlasClient.NAME,
Path.getPathWithoutSchemeAndAuthority(hdfsPath).toString().toLowerCase());
@@ -247,16 +243,16 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
ret.setAttribute("path", updatedPath);
ret.setAttribute("nameServiceId", nameServiceID);
- ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getHdfsPathQualifiedName(clusterName, updatedPath));
+ ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getHdfsPathQualifiedName(metadataNamespace, updatedPath));
} else {
ret.setAttribute("path", hdfsPathStr);
- ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getHdfsPathQualifiedName(clusterName, hdfsPathStr));
+ ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getHdfsPathQualifiedName(metadataNamespace, hdfsPathStr));
}
}
break;
case "HiveBolt": {
- clusterName = extractComponentClusterName(new HiveConf(),
stormConf);
+ metadataNamespace = extractComponentMetadataNamespace(new
HiveConf(), stormConf);
final String dbName =
config.get("HiveBolt.options.databaseName");
final String tblName =
config.get("HiveBolt.options.tableName");
@@ -267,8 +263,8 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
AtlasEntity dbEntity = new AtlasEntity("hive_db");
dbEntity.setAttribute(AtlasClient.NAME, dbName);
-
dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), dbName));
-
dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE,
getClusterName(stormConf));
+
dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getDBQualifiedName(metadataNamespace, dbName));
+
dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, metadataNamespace);
entityExtInfo.addReferredEntity(dbEntity);
@@ -277,7 +273,7 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
ret.setAttribute(AtlasClient.NAME, tblName);
ret.setAttribute(ATTRIBUTE_DB,
AtlasTypeUtil.getAtlasObjectId(dbEntity));
- ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tblName));
+ ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getTableQualifiedName(metadataNamespace, dbName, tblName));
}
}
break;
@@ -384,30 +380,25 @@ public class StormAtlasHook extends AtlasHook implements
ISubmitterHook {
}
}
- public static String getKafkaTopicQualifiedName(String clusterName, String
topicName) {
- return String.format("%s@%s", topicName.toLowerCase(), clusterName);
+ public static String getKafkaTopicQualifiedName(String metadataNamespace,
String topicName) {
+ return String.format("%s@%s", topicName.toLowerCase(),
metadataNamespace);
}
- public static String getHbaseTableQualifiedName(String clusterName, String
nameSpace, String tableName) {
- return String.format("%s.%s@%s", nameSpace.toLowerCase(),
tableName.toLowerCase(), clusterName);
+ public static String getHbaseTableQualifiedName(String metadataNamespace,
String nameSpace, String tableName) {
+ return String.format("%s.%s@%s", nameSpace.toLowerCase(),
tableName.toLowerCase(), metadataNamespace);
}
- public static String getHdfsPathQualifiedName(String clusterName, String
hdfsPath) {
- return String.format("%s@%s", hdfsPath.toLowerCase(), clusterName);
+ public static String getHdfsPathQualifiedName(String metadataNamespace,
String hdfsPath) {
+ return String.format("%s@%s", hdfsPath.toLowerCase(),
metadataNamespace);
}
- private String getClusterName(Map stormConf) {
- return atlasProperties.getString(AtlasConstants.CLUSTER_NAME_KEY,
AtlasConstants.DEFAULT_CLUSTER_NAME);
- }
-
- private String extractComponentClusterName(Configuration configuration,
Map stormConf) {
- String clusterName =
configuration.get(AtlasConstants.CLUSTER_NAME_KEY, null);
+ private String extractComponentMetadataNamespace(Configuration
configuration, Map stormConf) {
+ String clusterName = configuration.get(CLUSTER_NAME_KEY, null);
if (clusterName == null) {
- clusterName = getClusterName(stormConf);
+ clusterName = getMetadataNamespace();
}
return clusterName;
}
-
-}
+}
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/atlas/AtlasConstants.java
b/common/src/main/java/org/apache/atlas/AtlasConstants.java
index 2b9f411..2b7b4b3 100644
--- a/common/src/main/java/org/apache/atlas/AtlasConstants.java
+++ b/common/src/main/java/org/apache/atlas/AtlasConstants.java
@@ -25,16 +25,16 @@ public final class AtlasConstants {
private AtlasConstants() {
}
- public static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
- public static final String DEFAULT_CLUSTER_NAME = "primary";
- public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName";
- public static final String SYSTEM_PROPERTY_APP_PORT = "atlas.app.port";
- public static final String DEFAULT_APP_PORT_STR = "21000";
- public static final String ATLAS_REST_ADDRESS_KEY = "atlas.rest.address";
- public static final String DEFAULT_ATLAS_REST_ADDRESS =
"http://localhost:21000";
- public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30;
- public static final String DEFAULT_TYPE_VERSION = "1.0";
-
+ public static final String CLUSTER_NAME_KEY =
"atlas.cluster.name";
+ public static final String SYSTEM_PROPERTY_APP_PORT =
"atlas.app.port";
+ public static final String ATLAS_REST_ADDRESS_KEY =
"atlas.rest.address";
public static final String ATLAS_MIGRATION_MODE_FILENAME =
"atlas.migration.data.filename";
public static final String ATLAS_SERVICES_ENABLED =
"atlas.services.enabled";
+
+ public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName";
+ public static final String DEFAULT_APP_PORT_STR = "21000";
+ public static final String DEFAULT_ATLAS_REST_ADDRESS =
"http://localhost:21000";
+ public static final String DEFAULT_TYPE_VERSION = "1.0";
+ public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30;
+
}
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 0030276..cc6546b 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -59,10 +59,14 @@ public abstract class AtlasHook {
public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY
= "atlas.notification.failed.messages.filename";
public static final String
ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY =
"atlas.notification.log.failed.messages";
public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME
= "atlas_hook_failed_messages.log";
+ public static final String CONF_METADATA_NAMESPACE
= "atlas.metadata.namespace";
+ public static final String CLUSTER_NAME_KEY
= "atlas.cluster.name";
+ public static final String DEFAULT_CLUSTER_NAME
= "primary";
protected static Configuration atlasProperties;
protected static NotificationInterface notificationInterface;
+ private static final String metadataNamespace;
private static final int SHUTDOWN_HOOK_WAIT_TIME_MS =
3000;
private static final boolean logFailedMessages;
private static final FailedMessagesLogger failedMessagesLogger;
@@ -95,6 +99,7 @@ public abstract class AtlasHook {
}
}
+ metadataNamespace = getMetadataNamespace(atlasProperties);
notificationMaxRetries =
atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);
notificationRetryInterval =
atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
notificationInterface = NotificationProvider.get();
@@ -306,4 +311,15 @@ public abstract class AtlasHook {
return ret;
}
-}
+ private static String getMetadataNamespace(Configuration config) {
+ return config.getString(CONF_METADATA_NAMESPACE,
getClusterName(config));
+ }
+
+ private static String getClusterName(Configuration config) {
+ return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
+ }
+
+ public String getMetadataNamespace() {
+ return metadataNamespace;
+ }
+}
\ No newline at end of file