Repository: atlas Updated Branches: refs/heads/master 31eb3664c -> bd39a509a
ATLAS-2511: updated import-hive utility to add options to selectively import given database/tables Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/bd39a509 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/bd39a509 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/bd39a509 Branch: refs/heads/master Commit: bd39a509a20ff7860fdfdcab67e46cc503038f1e Parents: 31eb366 Author: rmani <[email protected]> Authored: Thu Mar 22 12:33:47 2018 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Sun Mar 25 23:37:11 2018 -0700 ---------------------------------------------------------------------- addons/hive-bridge/src/bin/import-hive.sh | 8 +- .../atlas/hive/bridge/HiveMetaStoreBridge.java | 127 +++++++++++----- .../org/apache/atlas/hive/hook/HiveHook.java | 144 ++++++++++--------- .../hive/bridge/HiveMetaStoreBridgeTest.java | 19 +-- 4 files changed, 179 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/bd39a509/addons/hive-bridge/src/bin/import-hive.sh ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/bin/import-hive.sh b/addons/hive-bridge/src/bin/import-hive.sh index 47581ac..98f4c84 100755 --- a/addons/hive-bridge/src/bin/import-hive.sh +++ b/addons/hive-bridge/src/bin/import-hive.sh @@ -31,6 +31,8 @@ done BASEDIR=`dirname ${PRG}` BASEDIR=`cd ${BASEDIR}/..;pwd` +allargs=$@ + if test -z "${JAVA_HOME}" then JAVA_BIN=`which java` @@ -128,8 +130,8 @@ done echo "Log file for import is $LOGFILE" -"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.hive.bridge.HiveMetaStoreBridge +"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.hive.bridge.HiveMetaStoreBridge $allargs RETVAL=$? -[ $RETVAL -eq 0 ] && echo Hive Data Model imported successfully!!! -[ $RETVAL -ne 0 ] && echo Failed to import Hive Data Model!!! +[ $RETVAL -eq 0 ] && echo Hive Meta Data imported successfully!!! +[ $RETVAL -ne 0 ] && echo Failed to import Hive Meta Data!!! http://git-wip-us.apache.org/repos/asf/atlas/blob/bd39a509/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index 51df8d2..09c17a9 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -36,11 +36,11 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.commons.cli.ParseException; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Options; import org.apache.commons.collections.MapUtils; import org.apache.commons.configuration.Configuration; @@ -86,7 +86,9 @@ public class HiveMetaStoreBridge { public static final String SEP = ":".intern(); public static final String HDFS_PATH = "hdfs_path"; - private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; + private static final int EXIT_CODE_SUCCESS = 0; + private static final int EXIT_CODE_FAILED = 1; + private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; private final HdfsNameServiceResolver hdfsNameServiceResolver = HdfsNameServiceResolver.getInstance(); private final String clusterName; @@ -95,16 +97,27 @@ public class HiveMetaStoreBridge { private final boolean convertHdfsPathToLowerCase; - public static void main(String[] args) throws AtlasHookException { - try { - Configuration atlasConf = ApplicationProperties.get(); - String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT); + public static void main(String[] args) { + int exitCode = EXIT_CODE_FAILED; - if (atlasEndpoint == null || atlasEndpoint.length == 0){ + try { + Options options = new Options(); + options.addOption("d", "database", true, "Databbase name"); + options.addOption("t", "table", true, "Table name"); + options.addOption("failOnError", false, "failOnError"); + + CommandLine cmd = new BasicParser().parse(options, args); + boolean failOnError = cmd.hasOption("failOnError"); + String databaseToImport = cmd.getOptionValue("d"); + String tableToImport = cmd.getOptionValue("t"); + Configuration atlasConf = ApplicationProperties.get(); + String[] atlasEndpoint = atlasConf.getStringArray(ATLAS_ENDPOINT); + + if (atlasEndpoint == null || atlasEndpoint.length == 0) { atlasEndpoint = new String[] { DEFAULT_ATLAS_URL }; } - AtlasClientV2 atlasClientV2; + final AtlasClientV2 atlasClientV2; if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput(); @@ -116,17 +129,35 @@ public class HiveMetaStoreBridge { atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint); } - Options options = new Options(); - CommandLineParser parser = new BasicParser(); - CommandLine cmd = parser.parse(options, args); - boolean failOnError = cmd.hasOption("failOnError"); - HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(atlasConf, new HiveConf(), atlasClientV2); - hiveMetaStoreBridge.importHiveMetadata(failOnError); + hiveMetaStoreBridge.importHiveMetadata(databaseToImport, tableToImport, failOnError); + + exitCode = EXIT_CODE_SUCCESS; + } catch(ParseException e) { + LOG.error("Failed to parse arguments. Error: ", e.getMessage()); + + printUsage(); } catch(Exception e) { - throw new AtlasHookException("HiveMetaStoreBridge.main() failed.", e); + LOG.error("Import failed", e); } + + System.exit(exitCode); + } + + private static void printUsage() { + System.out.println(); + System.out.println(); + System.out.println("Usage 1: import-hive.sh [-d <database> OR --database <database>] " ); + System.out.println(" Imports specified database and its tables ..."); + System.out.println(); + System.out.println("Usage 2: import-hive.sh [-d <database> OR --database <database>] [-t <table> OR --table <table>]"); + System.out.println(" Imports specified table within that database ..."); + System.out.println(); + System.out.println("Usage 3: import-hive.sh"); + System.out.println(" Imports all databases and tables..."); + System.out.println(); + System.out.println(); } /** @@ -174,23 +205,33 @@ public class HiveMetaStoreBridge { @VisibleForTesting - public void importHiveMetadata(boolean failOnError) throws Exception { + public void importHiveMetadata(String databaseToImport, String tableToImport, boolean failOnError) throws Exception { LOG.info("Importing Hive metadata"); - importDatabases(failOnError); + importDatabases(failOnError, databaseToImport, tableToImport); } - private void importDatabases(boolean failOnError) throws Exception { - List<String> databases = hiveClient.getAllDatabases(); + private void importDatabases(boolean failOnError, String databaseToImport, String tableToImport) throws Exception { + final List<String> databaseNames; + + if (StringUtils.isEmpty(databaseToImport)) { + databaseNames = hiveClient.getAllDatabases(); + } else { + databaseNames = hiveClient.getDatabasesByPattern(databaseToImport); + } - LOG.info("Found {} databases", databases.size()); + if(!CollectionUtils.isEmpty(databaseNames)) { + LOG.info("Found {} databases", databaseNames.size()); - for (String databaseName : databases) { - AtlasEntityWithExtInfo dbEntity = registerDatabase(databaseName); + for (String databaseName : databaseNames) { + AtlasEntityWithExtInfo dbEntity = registerDatabase(databaseName); - if (dbEntity != null) { - importTables(dbEntity.getEntity(), databaseName, failOnError); + if (dbEntity != null) { + importTables(dbEntity.getEntity(), databaseName, tableToImport, failOnError); + } } + } else { + LOG.info("No database found"); } } @@ -201,25 +242,35 @@ public class HiveMetaStoreBridge { * @param failOnError * @throws Exception */ - private int importTables(AtlasEntity dbEntity, String databaseName, final boolean failOnError) throws Exception { - List<String> hiveTables = hiveClient.getAllTables(databaseName); + private int importTables(AtlasEntity dbEntity, String databaseName, String tblName, final boolean failOnError) throws Exception { + int tablesImported = 0; - LOG.info("Found {} tables in database {}", hiveTables.size(), databaseName); + final List<String> tableNames; - int tablesImported = 0; + if (StringUtils.isEmpty(tblName)) { + tableNames = hiveClient.getAllTables(databaseName); + } else { + tableNames = hiveClient.getTablesByPattern(databaseName, tblName); + } - try { - for (String tableName : hiveTables) { - int imported = importTable(dbEntity, databaseName, tableName, failOnError); + if(!CollectionUtils.isEmpty(tableNames)) { + LOG.info("Found {} tables to import in database {}", tableNames.size(), databaseName); - tablesImported += imported; - } - } finally { - if (tablesImported == hiveTables.size()) { - LOG.info("Successfully imported all {} tables from database {}", tablesImported, databaseName); - } else { - LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import", tablesImported, hiveTables.size(), databaseName); + try { + for (String tableName : tableNames) { + int imported = importTable(dbEntity, databaseName, tableName, failOnError); + + tablesImported += imported; + } + } finally { + if (tablesImported == tableNames.size()) { + LOG.info("Successfully imported {} tables from database {}", tablesImported, databaseName); + } else { + LOG.error("Imported {} of {} tables from database {}. Please check logs for errors during import", tablesImported, tableNames.size(), databaseName); + } } + } else { + LOG.info("No tables to import in database {}", databaseName); } return tablesImported; http://git-wip-us.apache.org/repos/asf/atlas/blob/bd39a509/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 3bf0aab..78f2e83 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -82,77 +82,81 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { LOG.debug("==> HiveHook.run({})", hookContext.getOperationName()); } - HiveOperation oper = OPERATION_MAP.get(hookContext.getOperationName()); - AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext); - - BaseHiveEvent event = null; - - switch (oper) { - case CREATEDATABASE: - event = new CreateDatabase(context); - break; - - case DROPDATABASE: - event = new DropDatabase(context); - break; - - case ALTERDATABASE: - case ALTERDATABASE_OWNER: - event = new AlterDatabase(context); - break; - - case CREATETABLE: - event = new CreateTable(context, true); - break; - - case DROPTABLE: - case DROPVIEW: - event = new DropTable(context); - break; - - case CREATETABLE_AS_SELECT: - case CREATEVIEW: - case ALTERVIEW_AS: - case LOAD: - case EXPORT: - case IMPORT: - case QUERY: - case TRUNCATETABLE: - event = new CreateHiveProcess(context); - break; - - case ALTERTABLE_FILEFORMAT: - case ALTERTABLE_CLUSTER_SORT: - case ALTERTABLE_BUCKETNUM: - case ALTERTABLE_PROPERTIES: - case ALTERVIEW_PROPERTIES: - case ALTERTABLE_SERDEPROPERTIES: - case ALTERTABLE_SERIALIZER: - case ALTERTABLE_ADDCOLS: - case ALTERTABLE_REPLACECOLS: - case ALTERTABLE_PARTCOLTYPE: - case ALTERTABLE_LOCATION: - event = new AlterTable(context); - break; - - case ALTERTABLE_RENAME: - case ALTERVIEW_RENAME: - event = new AlterTableRename(context); - break; - - case ALTERTABLE_RENAMECOL: - event = new AlterTableRenameCol(context); - break; - - default: - if (LOG.isDebugEnabled()) { - LOG.debug("HiveHook.run({}): operation ignored", hookContext.getOperationName()); - } - break; - } + try { + HiveOperation oper = OPERATION_MAP.get(hookContext.getOperationName()); + AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext); + + BaseHiveEvent event = null; + + switch (oper) { + case CREATEDATABASE: + event = new CreateDatabase(context); + break; + + case DROPDATABASE: + event = new DropDatabase(context); + break; + + case ALTERDATABASE: + case ALTERDATABASE_OWNER: + event = new AlterDatabase(context); + break; + + case CREATETABLE: + event = new CreateTable(context, true); + break; + + case DROPTABLE: + case DROPVIEW: + event = new DropTable(context); + break; + + case CREATETABLE_AS_SELECT: + case CREATEVIEW: + case ALTERVIEW_AS: + case LOAD: + case EXPORT: + case IMPORT: + case QUERY: + case TRUNCATETABLE: + event = new CreateHiveProcess(context); + break; + + case ALTERTABLE_FILEFORMAT: + case ALTERTABLE_CLUSTER_SORT: + case ALTERTABLE_BUCKETNUM: + case ALTERTABLE_PROPERTIES: + case ALTERVIEW_PROPERTIES: + case ALTERTABLE_SERDEPROPERTIES: + case ALTERTABLE_SERIALIZER: + case ALTERTABLE_ADDCOLS: + case ALTERTABLE_REPLACECOLS: + case ALTERTABLE_PARTCOLTYPE: + case ALTERTABLE_LOCATION: + event = new AlterTable(context); + break; + + case ALTERTABLE_RENAME: + case ALTERVIEW_RENAME: + event = new AlterTableRename(context); + break; + + case ALTERTABLE_RENAMECOL: + event = new AlterTableRenameCol(context); + break; + + default: + if (LOG.isDebugEnabled()) { + LOG.debug("HiveHook.run({}): operation ignored", hookContext.getOperationName()); + } + break; + } - if (event != null) { - super.notifyEntities(event.getNotificationMessages()); + if (event != null) { + super.notifyEntities(event.getNotificationMessages()); + } + } catch (Throwable t) { + LOG.error("HiveHook.run(): failed to process operation {}", hookContext.getOperationName(), t); } if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/atlas/blob/bd39a509/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java index 271511e..d55aa53 100644 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java @@ -97,7 +97,7 @@ public class HiveMetaStoreBridgeTest { getEntity(HiveDataTypes.HIVE_DB.getName(), AtlasClient.GUID, "72e06b34-9151-4023-aa9d-b82103a50e76"))).getEntity()); HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); - bridge.importHiveMetadata(true); + bridge.importHiveMetadata(null, null, true); // verify update is called verify(atlasClientV2).updateEntity(anyObject()); @@ -126,7 +126,8 @@ public class HiveMetaStoreBridgeTest { when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")) .thenReturn(createTableReference()); - String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(0)); + Table testTable = hiveTables.get(0); + String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable); when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(), Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, @@ -136,7 +137,7 @@ public class HiveMetaStoreBridgeTest { HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); - bridge.importHiveMetadata(true); + bridge.importHiveMetadata(null, null, true); // verify update is called on table verify(atlasClientV2, times(2)).updateEntity(anyObject()); @@ -207,7 +208,7 @@ public class HiveMetaStoreBridgeTest { HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); try { - bridge.importHiveMetadata(true); + bridge.importHiveMetadata(null, null, true); } catch (Exception e) { Assert.fail("Partition with null key caused import to fail with exception ", e); } @@ -231,7 +232,8 @@ public class HiveMetaStoreBridgeTest { when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")) .thenReturn(createTableReference()); - String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(1)); + Table testTable = hiveTables.get(1); + String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable); when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(), Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, @@ -241,7 +243,7 @@ public class HiveMetaStoreBridgeTest { HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); try { - bridge.importHiveMetadata(false); + bridge.importHiveMetadata(null, null, false); } catch (Exception e) { Assert.fail("Table registration failed with exception", e); } @@ -267,7 +269,8 @@ public class HiveMetaStoreBridgeTest { when(atlasEntityWithExtInfo.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")) .thenReturn(createTableReference()); - String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, hiveTables.get(1)); + Table testTable = hiveTables.get(1); + String processQualifiedName = HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME, testTable); when(atlasClientV2.getEntityByAttribute(HiveDataTypes.HIVE_PROCESS.getName(), Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, @@ -277,7 +280,7 @@ public class HiveMetaStoreBridgeTest { HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClientV2); try { - bridge.importHiveMetadata(true); + bridge.importHiveMetadata(null, null, true); Assert.fail("Table registration is supposed to fail"); } catch (Exception e) { //Expected
