Repository: atlas Updated Branches: refs/heads/master f42c1d9ff -> 7f5a665e4
ATLAS-2525: updated HBase, Hive hooks to enable import namespaces/databases/tables listed in a file Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/7f5a665e Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/7f5a665e Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/7f5a665e Branch: refs/heads/master Commit: 7f5a665e4d93322511813e58fde6175af27834f2 Parents: f42c1d9 Author: rmani <[email protected]> Authored: Tue Apr 3 17:27:11 2018 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Tue Apr 3 18:29:12 2018 -0700 ---------------------------------------------------------------------- addons/hbase-bridge/src/bin/import-hbase.sh | 2 +- .../apache/atlas/hbase/bridge/HBaseBridge.java | 661 +++++++++++++++++++ .../atlas/hbase/util/ImportHBaseEntities.java | 101 --- .../hbase/util/ImportHBaseEntitiesBase.java | 417 ------------ .../atlas/hive/bridge/HiveMetaStoreBridge.java | 46 +- 5 files changed, 705 insertions(+), 522 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/7f5a665e/addons/hbase-bridge/src/bin/import-hbase.sh ---------------------------------------------------------------------- diff --git a/addons/hbase-bridge/src/bin/import-hbase.sh b/addons/hbase-bridge/src/bin/import-hbase.sh index 07a3af4..0a5989f 100644 --- a/addons/hbase-bridge/src/bin/import-hbase.sh +++ b/addons/hbase-bridge/src/bin/import-hbase.sh @@ -134,7 +134,7 @@ done echo "Log file for import is $LOGFILE" -"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.hbase.util.ImportHBaseEntities $allargs +"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.hbase.bridge.HBaseBridge $allargs RETVAL=$? [ $RETVAL -eq 0 ] && echo HBase Data Model imported successfully!!! http://git-wip-us.apache.org/repos/asf/atlas/blob/7f5a665e/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java ---------------------------------------------------------------------- diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java new file mode 100644 index 0000000..a546ff7 --- /dev/null +++ b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/bridge/HBaseBridge.java @@ -0,0 +1,661 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hbase.bridge; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.hbase.model.HBaseDataTypes; +import org.apache.atlas.hook.AtlasHookException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.utils.AuthenticationUtil; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +public class HBaseBridge { + private static final Logger LOG = LoggerFactory.getLogger(HBaseBridge.class); + + private static final int EXIT_CODE_SUCCESS = 0; + private static final int EXIT_CODE_FAILED = 1; + private static final String ATLAS_ENDPOINT = "atlas.rest.address"; + private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; + private static final String HBASE_CLUSTER_NAME = "atlas.cluster.name"; + private static final String DEFAULT_CLUSTER_NAME = "primary"; + private static final String QUALIFIED_NAME = "qualifiedName"; + private static final String NAME = "name"; + private static final String URI = "uri"; + private static final String OWNER = "owner"; + private static final String DESCRIPTION_ATTR = "description"; + private static final String CLUSTERNAME = "clusterName"; + private static final String NAMESPACE = "namespace"; + private static final String TABLE = "table"; + private static final String COLUMN_FAMILIES = "column_families"; + + // table metadata + private static final String ATTR_TABLE_MAX_FILESIZE = "maxFileSize"; + private static final String ATTR_TABLE_ISREADONLY = "isReadOnly"; + private static final String ATTR_TABLE_ISCOMPACTION_ENABLED = "isCompactionEnabled"; + private static final String ATTR_TABLE_REPLICATION_PER_REGION = "replicasPerRegion"; + private static final String ATTR_TABLE_DURABLILITY = "durability"; + + // column family metadata + private static final String ATTR_CF_BLOOMFILTER_TYPE = "bloomFilterType"; + private static final String ATTR_CF_COMPRESSION_TYPE = "compressionType"; + private static final String ATTR_CF_COMPACTION_COMPRESSION_TYPE = "compactionCompressionType"; + private static final String ATTR_CF_ENCRYPTION_TYPE = "encryptionType"; + private static final String ATTR_CF_KEEP_DELETE_CELLS = "keepDeletedCells"; + private static final String ATTR_CF_MAX_VERSIONS = "maxVersions"; + private static final String ATTR_CF_MIN_VERSIONS = "minVersions"; + private static final String ATTR_CF_DATA_BLOCK_ENCODING = "dataBlockEncoding"; + private static final String ATTR_CF_TTL = "ttl"; + private static final String ATTR_CF_BLOCK_CACHE_ENABLED = "blockCacheEnabled"; + private static final String ATTR_CF_CACHED_BLOOM_ON_WRITE = "cacheBloomsOnWrite"; + private static final String ATTR_CF_CACHED_DATA_ON_WRITE = "cacheDataOnWrite"; + private static final String ATTR_CF_CACHED_INDEXES_ON_WRITE = "cacheIndexesOnWrite"; + private static final String ATTR_CF_EVICT_BLOCK_ONCLOSE = "evictBlocksOnClose"; + private static final String ATTR_CF_PREFETCH_BLOCK_ONOPEN = "prefetchBlocksOnOpen"; + private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + + private static final String HBASE_NAMESPACE_QUALIFIED_NAME = "%s@%s"; + private static final String HBASE_TABLE_QUALIFIED_NAME_FORMAT = "%s:%s@%s"; + private static final String HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT = "%s:%s.%s@%s"; + + private final String clusterName; + private final AtlasClientV2 atlasClientV2; + private final HBaseAdmin hbaseAdmin; + + + public static void main(String[] args) { + int exitCode = EXIT_CODE_FAILED; + + try { + Options options = new Options(); + options.addOption("n","namespace", true, "namespace"); + options.addOption("t", "table", true, "tablename"); + options.addOption("f", "filename", true, "filename"); + + CommandLineParser parser = new BasicParser(); + CommandLine cmd = parser.parse(options, args); + String namespaceToImport = cmd.getOptionValue("n"); + String tableToImport = cmd.getOptionValue("t"); + String fileToImport = cmd.getOptionValue("f"); + Configuration atlasConf = ApplicationProperties.get(); + String[] urls = atlasConf.getStringArray(ATLAS_ENDPOINT); + + if (urls == null || urls.length == 0) { + urls = new String[] { DEFAULT_ATLAS_URL }; + } + + final AtlasClientV2 atlasClientV2; + + if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { + String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput(); + + atlasClientV2 = new AtlasClientV2(urls, basicAuthUsernamePassword); + } else { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls); + } + + HBaseBridge importer = new HBaseBridge(atlasConf, atlasClientV2); + + if (StringUtils.isNotEmpty(fileToImport)) { + File f = new File(fileToImport); + + if (f.exists() && f.canRead()) { + BufferedReader br = new BufferedReader(new FileReader(f)); + String line = null; + + while((line = br.readLine()) != null) { + String val[] = line.split(":"); + + if (ArrayUtils.isNotEmpty(val)) { + namespaceToImport = val[0]; + + if (val.length > 1) { + tableToImport = val[1]; + } else { + tableToImport = ""; + } + + importer.importHBaseEntities(namespaceToImport, tableToImport); + } + } + + exitCode = EXIT_CODE_SUCCESS; + } else { + LOG.error("Failed to read the file"); + } + } else { + importer.importHBaseEntities(namespaceToImport, tableToImport); + + exitCode = EXIT_CODE_SUCCESS; + } + } catch(ParseException e) { + LOG.error("Failed to parse arguments. Error: ", e.getMessage()); + printUsage(); + } catch(Exception e) { + System.out.println("ImportHBaseEntities failed. Please check the log file for the detailed error message"); + + LOG.error("ImportHBaseEntities failed", e); + } + + System.exit(exitCode); + } + + public HBaseBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2) throws Exception { + this.atlasClientV2 = atlasClientV2; + this.clusterName = atlasConf.getString(HBASE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); + + org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); + + LOG.info("checking HBase availability.."); + + HBaseAdmin.checkHBaseAvailable(conf); + + LOG.info("HBase is available"); + + hbaseAdmin = new HBaseAdmin(conf); + } + + private boolean importHBaseEntities(String namespaceToImport, String tableToImport) throws Exception { + boolean ret = false; + + if (StringUtils.isEmpty(namespaceToImport) && StringUtils.isEmpty(tableToImport)) { + // when both NameSpace and Table options are not present + importNameSpaceAndTable(); + ret = true; + } else if (StringUtils.isNotEmpty(namespaceToImport)) { + // When Namespace option is present or both namespace and table options are present + importNameSpaceWithTable(namespaceToImport, tableToImport); + ret = true; + } else if (StringUtils.isNotEmpty(tableToImport)) { + importTable(tableToImport); + ret = true; + } + + return ret; + } + + public void importNameSpace(final String nameSpace) throws Exception { + List<NamespaceDescriptor> matchingNameSpaceDescriptors = getMatchingNameSpaces(nameSpace); + + if (CollectionUtils.isNotEmpty(matchingNameSpaceDescriptors)) { + for (NamespaceDescriptor namespaceDescriptor : matchingNameSpaceDescriptors) { + createOrUpdateNameSpace(namespaceDescriptor); + } + } else { + throw new AtlasHookException("No NameSpace found for the given criteria. NameSpace = " + nameSpace); + } + } + + public void importTable(final String tableName) throws Exception { + String tableNameStr = null; + HTableDescriptor[] htds = hbaseAdmin.listTables(Pattern.compile(tableName)); + + if (ArrayUtils.isNotEmpty(htds)) { + for (HTableDescriptor htd : htds) { + String tblNameWithNameSpace = htd.getTableName().getNameWithNamespaceInclAsString(); + String tblNameWithOutNameSpace = htd.getTableName().getNameAsString(); + + if (tableName.equals(tblNameWithNameSpace)) { + tableNameStr = tblNameWithNameSpace; + } else if (tableName.equals(tblNameWithOutNameSpace)) { + tableNameStr = tblNameWithOutNameSpace; + } else { + // when wild cards are used in table name + if (tblNameWithNameSpace != null) { + tableNameStr = tblNameWithNameSpace; + } else if (tblNameWithOutNameSpace != null) { + tableNameStr = tblNameWithOutNameSpace; + } + } + + byte[] nsByte = htd.getTableName().getNamespace(); + String nsName = new String(nsByte); + NamespaceDescriptor nsDescriptor = hbaseAdmin.getNamespaceDescriptor(nsName); + AtlasEntityWithExtInfo entity = createOrUpdateNameSpace(nsDescriptor); + HColumnDescriptor[] hcdts = htd.getColumnFamilies(); + + createOrUpdateTable(nsName, tableNameStr, entity.getEntity(), htd, hcdts); + } + } else { + throw new AtlasHookException("No Table found for the given criteria. Table = " + tableName); + } + } + + private void importNameSpaceAndTable() throws Exception { + NamespaceDescriptor[] namespaceDescriptors = hbaseAdmin.listNamespaceDescriptors(); + + if (ArrayUtils.isNotEmpty(namespaceDescriptors)) { + for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) { + String namespace = namespaceDescriptor.getName(); + + importNameSpace(namespace); + } + } + + HTableDescriptor[] htds = hbaseAdmin.listTables(); + + if (ArrayUtils.isNotEmpty(htds)) { + for (HTableDescriptor htd : htds) { + String tableName = htd.getNameAsString(); + + importTable(tableName); + } + } + } + + private void importNameSpaceWithTable(String namespaceToImport, String tableToImport) throws Exception { + importNameSpace(namespaceToImport); + + List<HTableDescriptor> hTableDescriptors = new ArrayList<>(); + + if (StringUtils.isEmpty(tableToImport)) { + List<NamespaceDescriptor> matchingNameSpaceDescriptors = getMatchingNameSpaces(namespaceToImport); + + if (CollectionUtils.isNotEmpty(matchingNameSpaceDescriptors)) { + hTableDescriptors = getTableDescriptors(matchingNameSpaceDescriptors); + } + } else { + tableToImport = namespaceToImport +":" + tableToImport; + + HTableDescriptor[] htds = hbaseAdmin.listTables(Pattern.compile(tableToImport)); + + hTableDescriptors.addAll(Arrays.asList(htds)); + } + + if (CollectionUtils.isNotEmpty(hTableDescriptors)) { + for (HTableDescriptor htd : hTableDescriptors) { + String tblName = htd.getTableName().getNameAsString(); + + importTable(tblName); + } + } + } + + private List<NamespaceDescriptor> getMatchingNameSpaces(String nameSpace) throws Exception { + List<NamespaceDescriptor> ret = new ArrayList<>(); + NamespaceDescriptor[] namespaceDescriptors = hbaseAdmin.listNamespaceDescriptors(); + + for (NamespaceDescriptor namespaceDescriptor:namespaceDescriptors){ + String nmSpace = namespaceDescriptor.getName(); + + if (nmSpace.matches(nameSpace)){ + ret.add(namespaceDescriptor); + } + } + + return ret; + } + + private List<HTableDescriptor> getTableDescriptors(List<NamespaceDescriptor> namespaceDescriptors) throws Exception { + List<HTableDescriptor> ret = new ArrayList<>(); + + for(NamespaceDescriptor namespaceDescriptor:namespaceDescriptors) { + HTableDescriptor[] tableDescriptors = hbaseAdmin.listTableDescriptorsByNamespace(namespaceDescriptor.getName()); + + ret.addAll(Arrays.asList(tableDescriptors)); + } + + return ret; + } + + protected AtlasEntityWithExtInfo createOrUpdateNameSpace(NamespaceDescriptor namespaceDescriptor) throws Exception { + String nsName = namespaceDescriptor.getName(); + String nsQualifiedName = getNameSpaceQualifiedName(clusterName, nsName); + AtlasEntityWithExtInfo nsEntity = findNameSpaceEntityInAtlas(nsQualifiedName); + + if (nsEntity == null) { + LOG.info("Importing NameSpace: " + nsQualifiedName); + + AtlasEntity entity = getNameSpaceEntity(nsName, null); + + nsEntity = createEntityInAtlas(new AtlasEntityWithExtInfo(entity)); + } else { + LOG.info("NameSpace already present in Atlas. Updating it..: " + nsQualifiedName); + + AtlasEntity entity = getNameSpaceEntity(nsName, nsEntity.getEntity()); + + nsEntity.setEntity(entity); + + nsEntity = updateEntityInAtlas(nsEntity); + } + return nsEntity; + } + + protected AtlasEntityWithExtInfo createOrUpdateTable(String nameSpace, String tableName, AtlasEntity nameSapceEntity, HTableDescriptor htd, HColumnDescriptor[] hcdts) throws Exception { + String owner = htd.getOwnerString(); + String tblQualifiedName = getTableQualifiedName(clusterName, nameSpace, tableName); + AtlasEntityWithExtInfo ret = findTableEntityInAtlas(tblQualifiedName); + + if (ret == null) { + LOG.info("Importing Table: " + tblQualifiedName); + + AtlasEntity entity = getTableEntity(nameSpace, tableName, owner, nameSapceEntity, htd, null); + + ret = createEntityInAtlas(new AtlasEntityWithExtInfo(entity)); + } else { + LOG.info("Table already present in Atlas. Updating it..: " + tblQualifiedName); + + AtlasEntity entity = getTableEntity(nameSpace, tableName, owner, nameSapceEntity, htd, ret.getEntity()); + + ret.setEntity(entity); + + ret = updateEntityInAtlas(ret); + } + + AtlasEntity tableEntity = ret.getEntity(); + + if (tableEntity != null) { + List<AtlasEntityWithExtInfo> cfEntities = createOrUpdateColumnFamilies(nameSpace, tableName, owner, hcdts, tableEntity); + + List<AtlasObjectId> cfIDs = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(cfEntities)) { + for (AtlasEntityWithExtInfo cfEntity : cfEntities) { + cfIDs.add(AtlasTypeUtil.getAtlasObjectId(cfEntity.getEntity())); + } + } + + tableEntity.setAttribute(COLUMN_FAMILIES, cfIDs); + } + + return ret; + } + + protected List<AtlasEntityWithExtInfo> createOrUpdateColumnFamilies(String nameSpace, String tableName, String owner, HColumnDescriptor[] hcdts , AtlasEntity tableEntity) throws Exception { + List<AtlasEntityWithExtInfo > ret = new ArrayList<>(); + + if (hcdts != null) { + AtlasObjectId tableId = AtlasTypeUtil.getAtlasObjectId(tableEntity); + + for (HColumnDescriptor columnFamilyDescriptor : hcdts) { + String cfName = columnFamilyDescriptor.getNameAsString(); + String cfQualifiedName = getColumnFamilyQualifiedName(clusterName, nameSpace, tableName, cfName); + AtlasEntityWithExtInfo cfEntity = findColumnFamiltyEntityInAtlas(cfQualifiedName); + + if (cfEntity == null) { + LOG.info("Importing Column-family: " + cfQualifiedName); + + AtlasEntity entity = getColumnFamilyEntity(nameSpace, tableName, owner, columnFamilyDescriptor, tableId, null); + + cfEntity = createEntityInAtlas(new AtlasEntityWithExtInfo(entity)); + } else { + LOG.info("ColumnFamily already present in Atlas. Updating it..: " + cfQualifiedName); + + AtlasEntity entity = getColumnFamilyEntity(nameSpace, tableName, owner, columnFamilyDescriptor, tableId, cfEntity.getEntity()); + + cfEntity.setEntity(entity); + + cfEntity = updateEntityInAtlas(cfEntity); + } + + ret.add(cfEntity); + } + } + + return ret; + } + + private AtlasEntityWithExtInfo findNameSpaceEntityInAtlas(String nsQualifiedName) { + AtlasEntityWithExtInfo ret = null; + + try { + ret = findEntityInAtlas(HBaseDataTypes.HBASE_NAMESPACE.getName(), nsQualifiedName); + } catch (Exception e) { + ret = null; // entity doesn't exist in Atlas + } + + return ret; + } + + private AtlasEntityWithExtInfo findTableEntityInAtlas(String tableQualifiedName) { + AtlasEntityWithExtInfo ret = null; + + try { + ret = findEntityInAtlas(HBaseDataTypes.HBASE_TABLE.getName(), tableQualifiedName); + } catch (Exception e) { + ret = null; // entity doesn't exist in Atlas + } + + return ret; + } + + private AtlasEntityWithExtInfo findColumnFamiltyEntityInAtlas(String columnFamilyQualifiedName) { + AtlasEntityWithExtInfo ret = null; + + try { + ret = findEntityInAtlas(HBaseDataTypes.HBASE_COLUMN_FAMILY.getName(), columnFamilyQualifiedName); + } catch (Exception e) { + ret = null; // entity doesn't exist in Atlas + } + + return ret; + } + + private AtlasEntityWithExtInfo findEntityInAtlas(String typeName, String qualifiedName) throws Exception { + Map<String, String> attributes = Collections.singletonMap(QUALIFIED_NAME, qualifiedName); + + return atlasClientV2.getEntityByAttribute(typeName, attributes); + } + + private AtlasEntity getNameSpaceEntity(String nameSpace, AtlasEntity nsEtity) { + AtlasEntity ret = null ; + + if (nsEtity == null) { + ret = new AtlasEntity(HBaseDataTypes.HBASE_NAMESPACE.getName()); + } else { + ret = nsEtity; + } + + String qualifiedName = getNameSpaceQualifiedName(clusterName, nameSpace); + + ret.setAttribute(QUALIFIED_NAME, qualifiedName); + ret.setAttribute(CLUSTERNAME, clusterName); + ret.setAttribute(NAME, nameSpace); + ret.setAttribute(DESCRIPTION_ATTR, nameSpace); + + return ret; + } + + private AtlasEntity getTableEntity(String nameSpace, String tableName, String owner, AtlasEntity nameSpaceEntity, HTableDescriptor htd, AtlasEntity atlasEntity) { + AtlasEntity ret = null; + + if (atlasEntity == null) { + ret = new AtlasEntity(HBaseDataTypes.HBASE_TABLE.getName()); + } else { + ret = atlasEntity; + } + + String tableQualifiedName = getTableQualifiedName(clusterName, nameSpace, tableName); + + ret.setAttribute(QUALIFIED_NAME, tableQualifiedName); + ret.setAttribute(CLUSTERNAME, clusterName); + ret.setAttribute(NAMESPACE, AtlasTypeUtil.getAtlasObjectId(nameSpaceEntity)); + ret.setAttribute(NAME, tableName); + ret.setAttribute(DESCRIPTION_ATTR, tableName); + ret.setAttribute(OWNER, owner); + ret.setAttribute(URI, tableName); + ret.setAttribute(ATTR_TABLE_MAX_FILESIZE, htd.getMaxFileSize()); + ret.setAttribute(ATTR_TABLE_REPLICATION_PER_REGION, htd.getRegionReplication()); + ret.setAttribute(ATTR_TABLE_ISREADONLY, htd.isReadOnly()); + ret.setAttribute(ATTR_TABLE_ISCOMPACTION_ENABLED, htd.isCompactionEnabled()); + ret.setAttribute(ATTR_TABLE_DURABLILITY, (htd.getDurability() != null ? htd.getDurability().name() : null)); + + return ret; + } + + private AtlasEntity getColumnFamilyEntity(String nameSpace, String tableName, String owner, HColumnDescriptor hcdt, AtlasObjectId tableId, AtlasEntity atlasEntity){ + AtlasEntity ret = null; + + if (atlasEntity == null) { + ret = new AtlasEntity(HBaseDataTypes.HBASE_COLUMN_FAMILY.getName()); + } else { + ret = atlasEntity; + } + + String cfName = hcdt.getNameAsString(); + String cfQualifiedName = getColumnFamilyQualifiedName(clusterName, nameSpace, tableName, cfName); + + ret.setAttribute(QUALIFIED_NAME, cfQualifiedName); + ret.setAttribute(CLUSTERNAME, clusterName); + ret.setAttribute(TABLE, tableId); + ret.setAttribute(NAME, cfName); + ret.setAttribute(DESCRIPTION_ATTR, cfName); + ret.setAttribute(OWNER, owner); + ret.setAttribute(ATTR_CF_BLOCK_CACHE_ENABLED, hcdt.isBlockCacheEnabled()); + ret.setAttribute(ATTR_CF_BLOOMFILTER_TYPE, (hcdt.getBloomFilterType() != null ? hcdt.getBloomFilterType().name():null)); + ret.setAttribute(ATTR_CF_CACHED_BLOOM_ON_WRITE, hcdt.isCacheBloomsOnWrite()); + ret.setAttribute(ATTR_CF_CACHED_DATA_ON_WRITE, hcdt.isCacheDataOnWrite()); + ret.setAttribute(ATTR_CF_CACHED_INDEXES_ON_WRITE, hcdt.isCacheIndexesOnWrite()); + ret.setAttribute(ATTR_CF_COMPACTION_COMPRESSION_TYPE, (hcdt.getCompactionCompressionType() != null ? hcdt.getCompactionCompressionType().name():null)); + ret.setAttribute(ATTR_CF_COMPRESSION_TYPE, (hcdt.getCompressionType() != null ? hcdt.getCompressionType().name():null)); + ret.setAttribute(ATTR_CF_DATA_BLOCK_ENCODING, (hcdt.getDataBlockEncoding() != null ? hcdt.getDataBlockEncoding().name():null)); + ret.setAttribute(ATTR_CF_ENCRYPTION_TYPE, hcdt.getEncryptionType()); + ret.setAttribute(ATTR_CF_EVICT_BLOCK_ONCLOSE, hcdt.isEvictBlocksOnClose()); + ret.setAttribute(ATTR_CF_KEEP_DELETE_CELLS, ( hcdt.getKeepDeletedCells() != null ? hcdt.getKeepDeletedCells().name():null)); + ret.setAttribute(ATTR_CF_MAX_VERSIONS, hcdt.getMaxVersions()); + ret.setAttribute(ATTR_CF_MIN_VERSIONS, hcdt.getMinVersions()); + ret.setAttribute(ATTR_CF_PREFETCH_BLOCK_ONOPEN, hcdt.isPrefetchBlocksOnOpen()); + ret.setAttribute(ATTR_CF_TTL, hcdt.getTimeToLive()); + + return ret; + } + + private AtlasEntityWithExtInfo createEntityInAtlas(AtlasEntityWithExtInfo entity) throws Exception { + AtlasEntityWithExtInfo ret = null; + EntityMutationResponse response = atlasClientV2.createEntity(entity); + List<AtlasEntityHeader> entities = response.getCreatedEntities(); + + if (CollectionUtils.isNotEmpty(entities)) { + AtlasEntityWithExtInfo getByGuidResponse = atlasClientV2.getEntityByGuid(entities.get(0).getGuid()); + + ret = getByGuidResponse; + + LOG.info("Created {} entity: name={}, guid={}", ret.getEntity().getTypeName(), ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), ret.getEntity().getGuid()); + } + return ret; + } + + private AtlasEntityWithExtInfo updateEntityInAtlas(AtlasEntityWithExtInfo entity) throws Exception { + AtlasEntityWithExtInfo ret = null; + EntityMutationResponse response = atlasClientV2.updateEntity(entity); + + if (response != null) { + List<AtlasEntityHeader> entities = response.getUpdatedEntities(); + + if (CollectionUtils.isNotEmpty(entities)) { + AtlasEntityWithExtInfo getByGuidResponse = atlasClientV2.getEntityByGuid(entities.get(0).getGuid()); + + ret = getByGuidResponse; + + LOG.info("Updated {} entity: name={}, guid={} ", ret.getEntity().getTypeName(), ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), ret.getEntity().getGuid()); + } else { + LOG.info("Entity: name={} ", entity.toString() + " not updated as it is unchanged from what is in Atlas" ); + ret = entity; + } + } else { + LOG.info("Entity: name={} ", entity.toString() + " not updated as it is unchanged from what is in Atlas" ); + ret = entity; + } + + return ret; + } + + /** + * Construct the qualified name used to uniquely identify a ColumnFamily instance in Atlas. + * @param clusterName Name of the cluster to which the Hbase component belongs + * @param nameSpace Name of the Hbase database to which the Table belongs + * @param tableName Name of the Hbase table + * @param columnFamily Name of the ColumnFamily + * @return Unique qualified name to identify the Table instance in Atlas. + */ + private static String getColumnFamilyQualifiedName(String clusterName, String nameSpace, String tableName, String columnFamily) { + tableName = stripNameSpace(tableName.toLowerCase()); + return String.format(HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), tableName, columnFamily.toLowerCase(), clusterName); + } + + /** + * Construct the qualified name used to uniquely identify a Table instance in Atlas. + * @param clusterName Name of the cluster to which the Hbase component belongs + * @param nameSpace Name of the Hbase database to which the Table belongs + * @param tableName Name of the Hbase table + * @return Unique qualified name to identify the Table instance in Atlas. + */ + private static String getTableQualifiedName(String clusterName, String nameSpace, String tableName) { + tableName = stripNameSpace(tableName.toLowerCase()); + return String.format(HBASE_TABLE_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), tableName, clusterName); + } + + /** + * Construct the qualified name used to uniquely identify a Hbase NameSpace instance in Atlas. + * @param clusterName Name of the cluster to which the Hbase component belongs + * @param nameSpace Name of the NameSpace + * @return Unique qualified name to identify the HBase NameSpace instance in Atlas. + */ + private static String getNameSpaceQualifiedName(String clusterName, String nameSpace) { + return String.format(HBASE_NAMESPACE_QUALIFIED_NAME, nameSpace.toLowerCase(), clusterName); + } + + private static String stripNameSpace(String tableName){ + tableName = tableName.substring(tableName.indexOf(":")+1); + + return tableName; + } + + private static void printUsage() { + System.out.println("Usage 1: import-hbase.sh [-n <namespace regex> OR --namespace <namespace regex >] [-t <table regex > OR --table <table regex>]"); + System.out.println("Usage 2: import-hbase.sh [-f <filename>]" ); + System.out.println(" Format:"); + System.out.println(" namespace1:tbl1"); + System.out.println(" namespace1:tbl2"); + System.out.println(" namespace2:tbl1"); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/7f5a665e/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntities.java ---------------------------------------------------------------------- diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntities.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntities.java deleted file mode 100644 index d2e9f99..0000000 --- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntities.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.hbase.util; - - -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.hook.AtlasHookException; -import org.apache.commons.lang.ArrayUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.NamespaceDescriptor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class ImportHBaseEntities extends ImportHBaseEntitiesBase { - private static final Logger LOG = LoggerFactory.getLogger(ImportHBaseEntities.class); - - public static void main(String[] args) throws AtlasHookException { - try { - ImportHBaseEntities importHBaseEntities = new ImportHBaseEntities(args); - - importHBaseEntities.execute(); - } catch(Exception e) { - throw new AtlasHookException("ImportHBaseEntities failed.", e); - } - } - - public ImportHBaseEntities(String[] args) throws Exception { - super(args); - } - - public boolean execute() throws Exception { - boolean ret = false; - - if (hbaseAdmin != null) { - if (StringUtils.isEmpty(namespaceToImport) && StringUtils.isEmpty(tableToImport)) { - NamespaceDescriptor[] namespaceDescriptors = hbaseAdmin.listNamespaceDescriptors(); - if (!ArrayUtils.isEmpty(namespaceDescriptors)) { - for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) { - String namespace = namespaceDescriptor.getName(); - importNameSpace(namespace); - } - } - HTableDescriptor[] htds = hbaseAdmin.listTables(); - if (!ArrayUtils.isEmpty(htds)) { - for (HTableDescriptor htd : htds) { - String tableName = htd.getNameAsString(); - importTable(tableName); - } - } - ret = true; - } else if (StringUtils.isNotEmpty(namespaceToImport)) { - importNameSpace(namespaceToImport); - ret = true; - } else if (StringUtils.isNotEmpty(tableToImport)) { - importTable(tableToImport); - ret = true; - } - } - - return ret; - } - - public String importNameSpace(final String nameSpace) throws Exception { - NamespaceDescriptor namespaceDescriptor = hbaseAdmin.getNamespaceDescriptor(nameSpace); - - createOrUpdateNameSpace(namespaceDescriptor); - - return namespaceDescriptor.getName(); - } - - public String importTable(final String tableName) throws Exception { - byte[] tblName = tableName.getBytes(); - HTableDescriptor htd = hbaseAdmin.getTableDescriptor(tblName); - String nsName = htd.getTableName().getNameWithNamespaceInclAsString(); - NamespaceDescriptor nsDescriptor = hbaseAdmin.getNamespaceDescriptor(nsName); - AtlasEntity nsEntity = createOrUpdateNameSpace(nsDescriptor); - HColumnDescriptor[] hcdts = htd.getColumnFamilies(); - createOrUpdateTable(nsName, tableName, nsEntity, htd, hcdts); - - return htd.getTableName().getNameAsString(); - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/7f5a665e/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntitiesBase.java ---------------------------------------------------------------------- diff --git a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntitiesBase.java b/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntitiesBase.java deleted file mode 100644 index 6c195b7..0000000 --- a/addons/hbase-bridge/src/main/java/org/apache/atlas/hbase/util/ImportHBaseEntitiesBase.java +++ /dev/null @@ -1,417 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.hbase.util; - -import org.apache.atlas.AtlasClientV2; -import org.apache.atlas.ApplicationProperties; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasObjectId; -import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.model.instance.EntityMutations; -import org.apache.atlas.model.instance.AtlasEntityHeader; -import org.apache.atlas.type.AtlasTypeUtil; -import org.apache.atlas.utils.AuthenticationUtil; -import org.apache.commons.cli.BasicParser; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.Options; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.configuration.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.NamespaceDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -public class ImportHBaseEntitiesBase { - private static final Logger LOG = LoggerFactory.getLogger(ImportHBaseEntitiesBase.class); - - static final String NAMESPACE_FLAG = "-n"; - static final String TABLE_FLAG = "-t"; - static final String NAMESPACE_FULL_FLAG = "--namespace"; - static final String TABLE_FULL_FLAG = "--tablename"; - static final String ATLAS_ENDPOINT = "atlas.rest.address"; - static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; - static final String NAMESPACE_TYPE = "hbase_namespace"; - static final String TABLE_TYPE = "hbase_table"; - static final String COLUMNFAMILY_TYPE = "hbase_column_family"; - static final String HBASE_CLUSTER_NAME = "atlas.cluster.name"; - static final String DEFAULT_CLUSTER_NAME = "primary"; - static final String QUALIFIED_NAME = "qualifiedName"; - static final String NAME = "name"; - static final String URI = "uri"; - static final String OWNER = "owner"; - static final String DESCRIPTION_ATTR = "description"; - static final String CLUSTERNAME = "clusterName"; - static final String NAMESPACE = "namespace"; - static final String TABLE = "table"; - static final String COLUMN_FAMILIES = "column_families"; - - // column addition metadata - public static final String ATTR_TABLE_MAX_FILESIZE = "maxFileSize"; - public static final String ATTR_TABLE_ISREADONLY = "isReadOnly"; - public static final String ATTR_TABLE_ISCOMPACTION_ENABLED = "isCompactionEnabled"; - public static final String ATTR_TABLE_REPLICATION_PER_REGION = "replicasPerRegion"; - public static final String ATTR_TABLE_DURABLILITY = "durability"; - - // column family additional metadata - public static final String ATTR_CF_BLOOMFILTER_TYPE = "bloomFilterType"; - public static final String ATTR_CF_COMPRESSION_TYPE = "compressionType"; - public static final String ATTR_CF_COMPACTION_COMPRESSION_TYPE = "compactionCompressionType"; - public static final String ATTR_CF_ENCRYPTION_TYPE = "encryptionType"; - public static final String ATTR_CF_KEEP_DELETE_CELLS = "keepDeletedCells"; - public static final String ATTR_CF_MAX_VERSIONS = "maxVersions"; - public static final String ATTR_CF_MIN_VERSIONS = "minVersions"; - public static final String ATTR_CF_DATA_BLOCK_ENCODING = "dataBlockEncoding"; - public static final String ATTR_CF_TTL = "ttl"; - public static final String ATTR_CF_BLOCK_CACHE_ENABLED = "blockCacheEnabled"; - public static final String ATTR_CF_CACHED_BLOOM_ON_WRITE = "cacheBloomsOnWrite"; - public static final String ATTR_CF_CACHED_DATA_ON_WRITE = "cacheDataOnWrite"; - public static final String ATTR_CF_CACHED_INDEXES_ON_WRITE = "cacheIndexesOnWrite"; - public static final String ATTR_CF_EVICT_BLOCK_ONCLOSE = "evictBlocksOnClose"; - public static final String ATTR_CF_PREFETCH_BLOCK_ONOPEN = "prefetchBlocksOnOpen"; - - public static final String HBASE_NAMESPACE_QUALIFIED_NAME = "%s@%s"; - public static final String HBASE_TABLE_QUALIFIED_NAME_FORMAT = "%s:%s@%s"; - public static final String HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT = "%s:%s.%s@%s"; - - protected final HBaseAdmin hbaseAdmin; - protected final boolean failOnError; - protected final String namespaceToImport; - protected final String tableToImport; - private final AtlasClientV2 atlasClientV2; - private final UserGroupInformation ugi; - private final String clusterName; - private final HashMap<String, AtlasEntity> nameSpaceCache = new HashMap<>(); - private final HashMap<String, AtlasEntity> tableCache = new HashMap<>(); - private final HashMap<String, AtlasEntity> columnFamilyCache = new HashMap<>(); - - - protected ImportHBaseEntitiesBase(String[] args) throws Exception { - checkArgs(args); - - Configuration atlasConf = ApplicationProperties.get(); - String[] urls = atlasConf.getStringArray(ATLAS_ENDPOINT); - - if (urls == null || urls.length == 0) { - urls = new String[]{DEFAULT_ATLAS_URL}; - } - - if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { - String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput(); - - ugi = null; - atlasClientV2 = new AtlasClientV2(urls, basicAuthUsernamePassword); - } else { - ugi = UserGroupInformation.getCurrentUser(); - atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls); - } - - Options options = new Options(); - options.addOption("n","namespace", true, "namespace"); - options.addOption("t", "table", true, "tablename"); - options.addOption("failOnError", false, "failOnError"); - - CommandLineParser parser = new BasicParser(); - CommandLine cmd = parser.parse(options, args); - - clusterName = atlasConf.getString(HBASE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); - failOnError = cmd.hasOption("failOnError"); - namespaceToImport = cmd.getOptionValue("n"); - tableToImport = cmd.getOptionValue("t"); - - org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); - - LOG.info("createHBaseClient(): checking HBase availability.."); - - HBaseAdmin.checkHBaseAvailable(conf); - - LOG.info("createHBaseClient(): HBase is available"); - - hbaseAdmin = new HBaseAdmin(conf); - } - - protected AtlasEntity createOrUpdateNameSpace(NamespaceDescriptor namespaceDescriptor) throws Exception { - String nsName = namespaceDescriptor.getName(); - String nsQualifiedName = getNameSpaceQualifiedName(clusterName, nsName); - AtlasEntity nsEntity = findNameSpaceEntityInAtlas(nsQualifiedName); - - if (nsEntity == null) { - LOG.info("Importing NameSpace: " + nsQualifiedName); - - AtlasEntity entity = getNameSpaceEntity(nsName); - - nsEntity = createEntityInAtlas(entity); - } - - return nsEntity; - } - - protected AtlasEntity createOrUpdateTable(String nameSpace, String tableName, AtlasEntity nameSapceEntity, HTableDescriptor htd, HColumnDescriptor[] hcdts) throws Exception { - String owner = htd.getOwnerString(); - String tblQualifiedName = getTableQualifiedName(clusterName, nameSpace, tableName); - AtlasEntity tableEntity = findTableEntityInAtlas(tblQualifiedName); - - if (tableEntity == null) { - LOG.info("Importing Table: " + tblQualifiedName); - - AtlasEntity entity = getTableEntity(nameSpace, tableName, owner, nameSapceEntity, htd); - - tableEntity = createEntityInAtlas(entity); - } - - List<AtlasEntity> cfEntities = createOrUpdateColumnFamilies(nameSpace, tableName, owner, hcdts, tableEntity); - List<AtlasObjectId> cfIDs = new ArrayList<>(); - - if (CollectionUtils.isNotEmpty(cfEntities)) { - for (AtlasEntity cfEntity : cfEntities) { - cfIDs.add(AtlasTypeUtil.getAtlasObjectId(cfEntity)); - } - } - - tableEntity.setAttribute(COLUMN_FAMILIES, cfIDs); - - return tableEntity; - } - - protected List<AtlasEntity> createOrUpdateColumnFamilies(String nameSpace, String tableName, String owner, HColumnDescriptor[] hcdts , AtlasEntity tableEntity) throws Exception { - List<AtlasEntity> ret = new ArrayList<>(); - - if (hcdts != null) { - AtlasObjectId tableId = AtlasTypeUtil.getAtlasObjectId(tableEntity); - - for (HColumnDescriptor hcdt : hcdts) { - String cfName = hcdt.getNameAsString(); - String cfQualifiedName = getColumnFamilyQualifiedName(clusterName, nameSpace, tableName, cfName); - AtlasEntity cfEntity = findColumnFamiltyEntityInAtlas(cfQualifiedName); - - if (cfEntity == null) { - LOG.info("Importing Column-family: " + cfQualifiedName); - - AtlasEntity entity = getColumnFamilyEntity(nameSpace, tableName, owner, hcdt, tableId); - - cfEntity = createEntityInAtlas(entity); - } - - ret.add(cfEntity); - } - } - - return ret; - } - - private AtlasEntity findNameSpaceEntityInAtlas(String nsQualifiedName) { - AtlasEntity ret = nameSpaceCache.get(nsQualifiedName); - - if (ret == null) { - try { - ret = findEntityInAtlas(NAMESPACE_TYPE, nsQualifiedName); - - if (ret != null) { - nameSpaceCache.put(nsQualifiedName, ret); - } - } catch (Exception e) { - ret = null; // entity doesn't exist in Atlas - } - } - - return ret; - } - - private AtlasEntity findTableEntityInAtlas(String tableQualifiedName) { - AtlasEntity ret = tableCache.get(tableQualifiedName); - - if (ret == null) { - try { - ret = findEntityInAtlas(TABLE_TYPE, tableQualifiedName); - - if (ret != null) { - tableCache.put(tableQualifiedName, ret); - } - } catch (Exception e) { - ret = null; // entity doesn't exist in Atlas - } - } - - return ret; - } - - private AtlasEntity findColumnFamiltyEntityInAtlas(String columnFamilyQualifiedName) { - AtlasEntity ret = columnFamilyCache.get(columnFamilyQualifiedName); - - if (ret == null) { - try { - ret = findEntityInAtlas(COLUMNFAMILY_TYPE, columnFamilyQualifiedName); - - if (ret != null) { - columnFamilyCache.put(columnFamilyQualifiedName, ret); - } - } catch (Exception e) { - ret = null; // entity doesn't exist in Atlas - } - } - - return ret; - } - - private AtlasEntity findEntityInAtlas(String typeName, String qualifiedName) throws Exception { - Map<String, String> attributes = Collections.singletonMap(QUALIFIED_NAME, qualifiedName); - - return atlasClientV2.getEntityByAttribute(typeName, attributes).getEntity(); - } - - private AtlasEntity getNameSpaceEntity(String nameSpace){ - AtlasEntity ret = new AtlasEntity(NAMESPACE_TYPE); - String qualifiedName = getNameSpaceQualifiedName(clusterName, nameSpace); - - ret.setAttribute(QUALIFIED_NAME, qualifiedName); - ret.setAttribute(CLUSTERNAME, clusterName); - ret.setAttribute(NAME, nameSpace); - ret.setAttribute(DESCRIPTION_ATTR, nameSpace); - - return ret; - } - - private AtlasEntity getTableEntity(String nameSpace, String tableName, String owner, AtlasEntity nameSpaceEntity, HTableDescriptor htd) { - AtlasEntity ret = new AtlasEntity(TABLE_TYPE); - String tableQualifiedName = getTableQualifiedName(clusterName, nameSpace, tableName); - - ret.setAttribute(QUALIFIED_NAME, tableQualifiedName); - ret.setAttribute(CLUSTERNAME, clusterName); - ret.setAttribute(NAMESPACE, AtlasTypeUtil.getAtlasObjectId(nameSpaceEntity)); - ret.setAttribute(NAME, tableName); - ret.setAttribute(DESCRIPTION_ATTR, tableName); - ret.setAttribute(OWNER, owner); - ret.setAttribute(URI, tableName); - ret.setAttribute(ATTR_TABLE_MAX_FILESIZE, htd.getMaxFileSize()); - ret.setAttribute(ATTR_TABLE_REPLICATION_PER_REGION, htd.getRegionReplication()); - ret.setAttribute(ATTR_TABLE_ISREADONLY, htd.isReadOnly()); - ret.setAttribute(ATTR_TABLE_ISCOMPACTION_ENABLED, htd.isCompactionEnabled()); - ret.setAttribute(ATTR_TABLE_DURABLILITY, (htd.getDurability() != null ? htd.getDurability().name() : null)); - - return ret; - } - - private AtlasEntity getColumnFamilyEntity(String nameSpace, String tableName, String owner, HColumnDescriptor hcdt, AtlasObjectId tableId){ - AtlasEntity ret = new AtlasEntity(COLUMNFAMILY_TYPE); - String cfName = hcdt.getNameAsString(); - String cfQualifiedName = getColumnFamilyQualifiedName(clusterName, nameSpace, tableName, cfName); - - ret.setAttribute(QUALIFIED_NAME, cfQualifiedName); - ret.setAttribute(CLUSTERNAME, clusterName); - ret.setAttribute(TABLE, tableId); - ret.setAttribute(NAME, cfName); - ret.setAttribute(DESCRIPTION_ATTR, cfName); - ret.setAttribute(OWNER, owner); - ret.setAttribute(ATTR_CF_BLOCK_CACHE_ENABLED, hcdt.isBlockCacheEnabled()); - ret.setAttribute(ATTR_CF_BLOOMFILTER_TYPE, (hcdt.getBloomFilterType() != null ? hcdt.getBloomFilterType().name():null)); - ret.setAttribute(ATTR_CF_CACHED_BLOOM_ON_WRITE, hcdt.isCacheBloomsOnWrite()); - ret.setAttribute(ATTR_CF_CACHED_DATA_ON_WRITE, hcdt.isCacheDataOnWrite()); - ret.setAttribute(ATTR_CF_CACHED_INDEXES_ON_WRITE, hcdt.isCacheIndexesOnWrite()); - ret.setAttribute(ATTR_CF_COMPACTION_COMPRESSION_TYPE, (hcdt.getCompactionCompressionType() != null ? hcdt.getCompactionCompressionType().name():null)); - ret.setAttribute(ATTR_CF_COMPRESSION_TYPE, (hcdt.getCompressionType() != null ? hcdt.getCompressionType().name():null)); - ret.setAttribute(ATTR_CF_DATA_BLOCK_ENCODING, (hcdt.getDataBlockEncoding() != null ? hcdt.getDataBlockEncoding().name():null)); - ret.setAttribute(ATTR_CF_ENCRYPTION_TYPE, hcdt.getEncryptionType()); - ret.setAttribute(ATTR_CF_EVICT_BLOCK_ONCLOSE, hcdt.isEvictBlocksOnClose()); - ret.setAttribute(ATTR_CF_KEEP_DELETE_CELLS, ( hcdt.getKeepDeletedCells() != null ? hcdt.getKeepDeletedCells().name():null)); - ret.setAttribute(ATTR_CF_MAX_VERSIONS, hcdt.getMaxVersions()); - ret.setAttribute(ATTR_CF_MIN_VERSIONS, hcdt.getMinVersions()); - ret.setAttribute(ATTR_CF_PREFETCH_BLOCK_ONOPEN, hcdt.isPrefetchBlocksOnOpen()); - ret.setAttribute(ATTR_CF_TTL, hcdt.getTimeToLive()); - - return ret; - } - - private AtlasEntity createEntityInAtlas(AtlasEntity entity) throws Exception { - AtlasEntity ret = null; - EntityMutationResponse response = atlasClientV2.createEntity(new AtlasEntity.AtlasEntityWithExtInfo(entity)); - List<AtlasEntityHeader> entities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE); - - if (CollectionUtils.isNotEmpty(entities)) { - AtlasEntity.AtlasEntityWithExtInfo getByGuidResponse = atlasClientV2.getEntityByGuid(entities.get(0).getGuid()); - - ret = getByGuidResponse.getEntity(); - - LOG.info ("Created entity: type=" + ret.getTypeName() + ", guid=" + ret.getGuid()); - } - - return ret; - } - - private void checkArgs(String[] args) throws Exception { - String option = args.length > 0 ? args[0] : null; - String value = args.length > 1 ? args[1] : null; - - if (option != null && value == null) { - if (option.equalsIgnoreCase(NAMESPACE_FLAG) || option.equalsIgnoreCase(NAMESPACE_FULL_FLAG) || - option.equalsIgnoreCase(TABLE_FLAG) || option.equalsIgnoreCase(TABLE_FULL_FLAG)) { - - System.out.println("Usage: import-hbase.sh [-n <namespace> OR --namespace <namespace>] [-t <table> OR --table <table>]"); - - throw new Exception("Incorrect arguments.."); - } - } - } - - /** - * Construct the qualified name used to uniquely identify a ColumnFamily instance in Atlas. - * @param clusterName Name of the cluster to which the Hbase component belongs - * @param nameSpace Name of the Hbase database to which the Table belongs - * @param tableName Name of the Hbase table - * @param columnFamily Name of the ColumnFamily - * @return Unique qualified name to identify the Table instance in Atlas. - */ - private static String getColumnFamilyQualifiedName(String clusterName, String nameSpace, String tableName, String columnFamily) { - tableName = stripNameSpace(tableName.toLowerCase()); - return String.format(HBASE_COLUMN_FAMILY_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), tableName, columnFamily.toLowerCase(), clusterName); - } - - /** - * Construct the qualified name used to uniquely identify a Table instance in Atlas. - * @param clusterName Name of the cluster to which the Hbase component belongs - * @param nameSpace Name of the Hbase database to which the Table belongs - * @param tableName Name of the Hbase table - * @return Unique qualified name to identify the Table instance in Atlas. - */ - private static String getTableQualifiedName(String clusterName, String nameSpace, String tableName) { - tableName = stripNameSpace(tableName.toLowerCase()); - return String.format(HBASE_TABLE_QUALIFIED_NAME_FORMAT, nameSpace.toLowerCase(), tableName, clusterName); - } - - /** - * Construct the qualified name used to uniquely identify a Hbase NameSpace instance in Atlas. - * @param clusterName Name of the cluster to which the Hbase component belongs - * @param nameSpace Name of the NameSpace - * @return Unique qualified name to identify the HBase NameSpace instance in Atlas. - */ - private static String getNameSpaceQualifiedName(String clusterName, String nameSpace) { - return String.format(HBASE_NAMESPACE_QUALIFIED_NAME, nameSpace.toLowerCase(), clusterName); - } - - private static String stripNameSpace(String tableName){ - tableName = tableName.substring(tableName.indexOf(":")+1); - - return tableName; - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/7f5a665e/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index 09c17a9..65cdd99 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -44,6 +44,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.collections.MapUtils; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; @@ -62,6 +63,9 @@ import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -102,14 +106,16 @@ public class HiveMetaStoreBridge { try { Options options = new Options(); - options.addOption("d", "database", true, "Databbase name"); + options.addOption("d", "database", true, "Database name"); options.addOption("t", "table", true, "Table name"); + options.addOption("f", "filename", true, "Filename"); options.addOption("failOnError", false, "failOnError"); CommandLine cmd = new BasicParser().parse(options, args); boolean failOnError = cmd.hasOption("failOnError"); String databaseToImport = cmd.getOptionValue("d"); String tableToImport = cmd.getOptionValue("t"); + String fileToImport = cmd.getOptionValue("f"); Configuration atlasConf = ApplicationProperties.get(); String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT); @@ -131,12 +137,40 @@ public class HiveMetaStoreBridge { HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2); - hiveMetaStoreBridge.importHiveMetadata(databaseToImport, tableToImport, failOnError); + if (StringUtils.isNotEmpty(fileToImport)) { + File f = new File(fileToImport); + + if (f.exists() && f.canRead()) { + BufferedReader br = new BufferedReader(new FileReader(f)); + String line = null; + + while((line = br.readLine()) != null) { + String val[] = line.split(":"); + + if (ArrayUtils.isNotEmpty(val)) { + databaseToImport = val[0]; + + if (val.length > 1) { + tableToImport = val[1]; + } else { + tableToImport = ""; + } + + hiveMetaStoreBridge.importHiveMetadata(databaseToImport, tableToImport, failOnError); + } + } + + exitCode = EXIT_CODE_SUCCESS; + } else { + LOG.error("Failed to read the input file: " + fileToImport); + } + } else { + hiveMetaStoreBridge.importHiveMetadata(databaseToImport, tableToImport, failOnError); + } exitCode = EXIT_CODE_SUCCESS; } catch(ParseException e) { LOG.error("Failed to parse arguments. Error: ", e.getMessage()); - printUsage(); } catch(Exception e) { LOG.error("Import failed", e); @@ -157,6 +191,12 @@ public class HiveMetaStoreBridge { System.out.println("Usage 3: import-hive.sh"); System.out.println(" Imports all databases and tables..."); System.out.println(); + System.out.println("Usage 4: import-hive.sh -f <filename>"); + System.out.println(" Imports all databases and tables in the file..."); + System.out.println(" Format:"); + System.out.println(" database1:tbl1"); + System.out.println(" database1:tbl2"); + System.out.println(" database2:tbl2"); System.out.println(); }
